feat(core): Build workflow index on server start and workflow updates (#21441)

This commit is contained in:
mfsiega 2025-11-03 11:01:12 +01:00 committed by GitHub
parent 440e83bdfc
commit 6df508fa1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 223 additions and 1 deletions

View File

@ -18,4 +18,8 @@ export class WorkflowsConfig {
/** How many workflows to activate simultaneously during startup. */
@Env('N8N_WORKFLOW_ACTIVATION_BATCH_SIZE')
activationBatchSize: number = 1;
/** Whether to enable workflow dependency indexing */
@Env('N8N_WORKFLOWS_INDEXING_ENABLED')
indexingEnabled: boolean = false;
}

View File

@ -179,6 +179,7 @@ describe('GlobalConfig', () => {
defaultName: 'My workflow',
callerPolicyDefaultOption: 'workflowsFromSameOwner',
activationBatchSize: 1,
indexingEnabled: false,
},
endpoints: {
metrics: {

View File

@ -7,6 +7,7 @@ import { ErrorReporter } from 'n8n-core';
import type { INode, IWorkflowBase } from 'n8n-workflow';
import { WorkflowIndexService } from '../workflow-index.service';
import { EventService } from '@/events/event.service';
describe('WorkflowIndexService', () => {
let service: WorkflowIndexService;
@ -14,6 +15,7 @@ describe('WorkflowIndexService', () => {
const mockWorkflowRepository = mockInstance(WorkflowRepository);
const mockLogger = mockInstance(Logger);
const mockErrorReporter = mockInstance(ErrorReporter);
const mockEventService = mockInstance(EventService);
beforeEach(() => {
jest.resetAllMocks();
@ -21,6 +23,7 @@ describe('WorkflowIndexService', () => {
service = new WorkflowIndexService(
mockWorkflowDependencyRepository,
mockWorkflowRepository,
mockEventService,
mockLogger,
mockErrorReporter,
);
@ -240,6 +243,18 @@ describe('WorkflowIndexService', () => {
});
});
describe('init()', () => {
it('should register event listeners for workflow events', () => {
service.init();
expect(mockEventService.on).toHaveBeenCalledTimes(4);
expect(mockEventService.on).toHaveBeenCalledWith('server-started', expect.any(Function));
expect(mockEventService.on).toHaveBeenCalledWith('workflow-created', expect.any(Function));
expect(mockEventService.on).toHaveBeenCalledWith('workflow-saved', expect.any(Function));
expect(mockEventService.on).toHaveBeenCalledWith('workflow-deleted', expect.any(Function));
});
});
describe('buildIndex()', () => {
it('should retrieve unindexed workflows and update their dependencies', async () => {
const workflow1 = createWorkflowEntity([
@ -282,6 +297,7 @@ describe('WorkflowIndexService', () => {
const serviceWithSmallBatch = new WorkflowIndexService(
mockWorkflowDependencyRepository,
mockWorkflowRepository,
mockEventService,
mockLogger,
mockErrorReporter,
batchSize,

View File

@ -4,6 +4,8 @@ import { Service } from '@n8n/di';
import { ErrorReporter } from 'n8n-core';
import { ensureError, INode, IWorkflowBase } from 'n8n-workflow';
import { EventService } from '@/events/event.service';
// A safety limit to prevent infinite loops in indexing.
const LOOP_LIMIT = 1_000_000_000;
@ -20,11 +22,28 @@ export class WorkflowIndexService {
constructor(
private readonly dependencyRepository: WorkflowDependencyRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly eventService: EventService,
private readonly logger: Logger,
private readonly errorReporter: ErrorReporter,
private readonly batchSize = 100,
) {}
init() {
this.eventService.on('server-started', async (): Promise<void> => {
this.logger.info('Building workflow dependency index...');
await this.buildIndex().catch((e) => this.errorReporter.error(e));
});
this.eventService.on('workflow-created', async ({ workflow }) => {
await this.updateIndexFor(workflow);
});
this.eventService.on('workflow-saved', async ({ workflow }) => {
await this.updateIndexFor(workflow);
});
this.eventService.on('workflow-deleted', async ({ workflowId }) => {
await this.dependencyRepository.removeDependenciesForWorkflow(workflowId);
});
}
async buildIndex() {
const batchSize = this.batchSize;
let processedCount = 0;

View File

@ -1,5 +1,5 @@
import { inDevelopment, inProduction } from '@n8n/backend-common';
import { SecurityConfig } from '@n8n/config';
import { DatabaseConfig, SecurityConfig, WorkflowsConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import type { APIRequest, AuthenticatedRequest } from '@n8n/db';
import { Container, Service } from '@n8n/di';
@ -278,6 +278,11 @@ export class Server extends AbstractServer {
await eventBus.initialize();
Container.get(LogStreamingEventRelay).init();
// ----------------------------------------
// Workflow Indexing Setup
// ----------------------------------------
await this.initializeWorkflowIndexing();
if (this.endpointPresetCredentials !== '') {
// POST endpoint to set preset credentials
const overwriteEndpointMiddleware =
@ -488,6 +493,21 @@ export class Server extends AbstractServer {
}
}
private async initializeWorkflowIndexing() {
if (Container.get(WorkflowsConfig).indexingEnabled) {
if (Container.get(DatabaseConfig).isLegacySqlite) {
this.logger.warn(
'Workflow indexing is disabled because legacy Sqlite databases are not supported. Please migrate the database to enable workflow indexing.',
);
return;
}
const { WorkflowIndexService } = await import(
'@/modules/workflow-index/workflow-index.service'
);
Container.get(WorkflowIndexService).init();
}
}
protected setupPushServer(): void {
const { restEndpoint, server, app } = this;
Container.get(Push).setupPushServer(restEndpoint, server, app);

View File

@ -8,6 +8,8 @@ import type { Cipher } from 'n8n-core';
import { ImportService } from '../import.service';
import type { CredentialsRepository, TagRepository } from '@n8n/db';
import type { ActiveWorkflowManager } from '@/active-workflow-manager';
import type { WorkflowIndexService } from '@/modules/workflow-index/workflow-index.service';
import type { DatabaseConfig } from '@n8n/config';
// Mock fs/promises
jest.mock('fs/promises');
@ -38,6 +40,8 @@ describe('ImportService', () => {
let mockEntityManager: EntityManager;
let mockCipher: Cipher;
let mockActiveWorkflowManager: ActiveWorkflowManager;
let mockWorkflowIndexService: WorkflowIndexService;
let mockDatabaseConfig: DatabaseConfig;
beforeEach(() => {
jest.clearAllMocks();
@ -49,6 +53,8 @@ describe('ImportService', () => {
mockEntityManager = mock<EntityManager>();
mockCipher = mock<Cipher>();
mockActiveWorkflowManager = mock<ActiveWorkflowManager>();
mockWorkflowIndexService = mock<WorkflowIndexService>();
mockDatabaseConfig = mock<DatabaseConfig>();
// Set up cipher mock
mockCipher.decrypt = jest.fn((data: string) => data.replace('encrypted:', ''));
@ -94,6 +100,8 @@ describe('ImportService', () => {
mockDataSource,
mockCipher,
mockActiveWorkflowManager,
mockWorkflowIndexService,
mockDatabaseConfig,
);
});

View File

@ -21,6 +21,8 @@ import { Cipher } from 'n8n-core';
import { decompressFolder } from '@/utils/compression.util';
import { z } from 'zod';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
import { WorkflowIndexService } from '@/modules/workflow-index/workflow-index.service';
import { DatabaseConfig } from '@n8n/config';
@Service()
export class ImportService {
@ -52,6 +54,8 @@ export class ImportService {
private readonly dataSource: DataSource,
private readonly cipher: Cipher,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly workflowIndexService: WorkflowIndexService,
private readonly databaseConfig: DatabaseConfig,
) {}
async initRecords() {
@ -79,6 +83,7 @@ export class ImportService {
}
}
const insertedWorkflows: IWorkflowBase[] = [];
const { manager: dbManager } = this.credentialsRepository;
await dbManager.transaction(async (tx) => {
for (const workflow of workflows) {
@ -92,6 +97,7 @@ export class ImportService {
const upsertResult = await tx.upsert(WorkflowEntity, workflow, ['id']);
const workflowId = upsertResult.identifiers.at(0)?.id as string;
insertedWorkflows.push({ ...workflow, id: workflowId }); // Collect inserted workflow with correct ID, for indexing later.
const personalProject = await tx.findOneByOrFail(Project, { id: projectId });
@ -116,6 +122,15 @@ export class ImportService {
}
}
});
// Directly update the index for the important workflows, since they don't generate
// workflow-update events during import.
// Workflow indexing isn't supported on legacy SQLite.
if (!this.databaseConfig.isLegacySqlite) {
for (const workflow of insertedWorkflows) {
await this.workflowIndexService.updateIndexFor(workflow);
}
}
}
async replaceInvalidCreds(workflow: IWorkflowBase) {

View File

@ -6,6 +6,7 @@ import {
newWorkflow,
testDb,
} from '@n8n/backend-test-utils';
import { DatabaseConfig } from '@n8n/config';
import type { Project, User } from '@n8n/db';
import {
TagEntity,
@ -20,6 +21,7 @@ import type { INode } from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import type { ActiveWorkflowManager } from '@/active-workflow-manager';
import type { WorkflowIndexService } from '@/modules/workflow-index/workflow-index.service';
import { ImportService } from '@/services/import.service';
import { createMember, createOwner } from './shared/db/users';
@ -30,6 +32,7 @@ describe('ImportService', () => {
let owner: User;
let ownerPersonalProject: Project;
let mockActiveWorkflowManager: ActiveWorkflowManager;
let mockWorkflowIndexService: WorkflowIndexService;
beforeAll(async () => {
await testDb.init();
@ -43,6 +46,8 @@ describe('ImportService', () => {
mockActiveWorkflowManager = mock<ActiveWorkflowManager>();
mockWorkflowIndexService = mock<WorkflowIndexService>();
importService = new ImportService(
mock(),
credentialsRepository,
@ -50,6 +55,8 @@ describe('ImportService', () => {
mock(),
mock(),
mockActiveWorkflowManager,
mockWorkflowIndexService,
Container.get(DatabaseConfig),
);
});
@ -71,6 +78,11 @@ describe('ImportService', () => {
if (!dbWorkflow) fail('Expected to find workflow');
expect(dbWorkflow.id).toBe(workflowToImport.id);
if (Container.get(DatabaseConfig).isLegacySqlite) {
expect(mockWorkflowIndexService.updateIndexFor).not.toHaveBeenCalled();
} else {
expect(mockWorkflowIndexService.updateIndexFor).toHaveBeenCalledWith(workflowToImport);
}
});
test('should make user owner of imported workflow', async () => {

View File

@ -0,0 +1,127 @@
import { Logger } from '@n8n/backend-common';
import { testDb } from '@n8n/backend-test-utils';
import { DatabaseConfig } from '@n8n/config';
import type { IWorkflowDb } from '@n8n/db';
import { WorkflowDependencyRepository, WorkflowRepository } from '@n8n/db';
import { Container } from '@n8n/di';
import { retryUntil } from '@test-integration/retry-until';
import { ErrorReporter } from 'n8n-core';
import { v4 as uuid } from 'uuid';
import { createOwner } from '../shared/db/users';
import { EventService } from '@/events/event.service';
import { WorkflowIndexService } from '@/modules/workflow-index/workflow-index.service';
let workflowIndexService: WorkflowIndexService;
let eventService: EventService;
let workflowRepository: WorkflowRepository;
let workflowDependencyRepository: WorkflowDependencyRepository;
beforeAll(async () => {
await testDb.init();
// Get real instances from the container
workflowRepository = Container.get(WorkflowRepository);
workflowDependencyRepository = Container.get(WorkflowDependencyRepository);
eventService = Container.get(EventService);
// Create the WorkflowIndexService with real dependencies
workflowIndexService = new WorkflowIndexService(
workflowDependencyRepository,
workflowRepository,
eventService,
Container.get(Logger),
Container.get(ErrorReporter),
);
// Initialize the service to register event listeners
workflowIndexService.init();
});
afterEach(async () => {
await testDb.truncate(['WorkflowEntity', 'WorkflowDependency']);
});
afterAll(async () => {
await testDb.terminate();
});
describe('WorkflowIndexService Integration', () => {
if (Container.get(DatabaseConfig).isLegacySqlite) {
// NOTE: this feature isn't supported on legacy SQLite databases, so we skip the tests.
it('is not supported on legacy SQLite databases', async () => {});
return;
}
describe('workflow-created event', () => {
it('should index a new workflow with a single node', async () => {
// Arrange
const owner = await createOwner();
const workflowId = uuid();
const versionId = uuid();
const workflow = {
id: workflowId,
name: 'Test Workflow',
active: false,
versionCounter: 1,
versionId,
nodes: [
{
id: 'node-1',
name: 'HTTP Request',
type: 'n8n-nodes-base.httpRequest',
typeVersion: 1,
position: [250, 300] as [number, number],
parameters: {},
},
],
connections: {},
settings: {},
triggerCount: 0,
isArchived: false,
createdAt: new Date(),
updatedAt: new Date(),
} satisfies IWorkflowDb;
// Save the workflow to the database
const savedWorkflow = await workflowRepository.save(workflow);
// Act - emit the workflow-created event
eventService.emit('workflow-created', {
user: {
id: owner.id,
email: owner.email,
firstName: owner.firstName,
lastName: owner.lastName,
role: { slug: owner.role.slug },
},
workflow: savedWorkflow,
publicApi: false,
projectId: uuid(),
projectType: 'personal',
});
await retryUntil(async () => {
// Assert - check that dependencies were indexed in the database
const dependencies = await workflowDependencyRepository.find({
where: { workflowId },
});
expect(dependencies).toHaveLength(1);
expect(dependencies[0]).toMatchObject({
workflowId,
workflowVersionId: 1,
dependencyType: 'nodeType',
dependencyKey: 'n8n-nodes-base.httpRequest',
dependencyInfo: {
nodeId: 'node-1',
nodeVersion: 1,
},
indexVersionId: 1,
});
});
});
});
});