diff --git a/packages/@n8n/config/src/configs/workflows.config.ts b/packages/@n8n/config/src/configs/workflows.config.ts index bd50e1bb609..7fe9998d0b6 100644 --- a/packages/@n8n/config/src/configs/workflows.config.ts +++ b/packages/@n8n/config/src/configs/workflows.config.ts @@ -18,4 +18,8 @@ export class WorkflowsConfig { /** How many workflows to activate simultaneously during startup. */ @Env('N8N_WORKFLOW_ACTIVATION_BATCH_SIZE') activationBatchSize: number = 1; + + /** Whether to enable workflow dependency indexing */ + @Env('N8N_WORKFLOWS_INDEXING_ENABLED') + indexingEnabled: boolean = false; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 154b190b33f..1416e663849 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -179,6 +179,7 @@ describe('GlobalConfig', () => { defaultName: 'My workflow', callerPolicyDefaultOption: 'workflowsFromSameOwner', activationBatchSize: 1, + indexingEnabled: false, }, endpoints: { metrics: { diff --git a/packages/cli/src/modules/workflow-index/__tests__/workflow-index.service.test.ts b/packages/cli/src/modules/workflow-index/__tests__/workflow-index.service.test.ts index 81016418bca..e0c2cd4a9c5 100644 --- a/packages/cli/src/modules/workflow-index/__tests__/workflow-index.service.test.ts +++ b/packages/cli/src/modules/workflow-index/__tests__/workflow-index.service.test.ts @@ -7,6 +7,7 @@ import { ErrorReporter } from 'n8n-core'; import type { INode, IWorkflowBase } from 'n8n-workflow'; import { WorkflowIndexService } from '../workflow-index.service'; +import { EventService } from '@/events/event.service'; describe('WorkflowIndexService', () => { let service: WorkflowIndexService; @@ -14,6 +15,7 @@ describe('WorkflowIndexService', () => { const mockWorkflowRepository = mockInstance(WorkflowRepository); const mockLogger = mockInstance(Logger); const mockErrorReporter = mockInstance(ErrorReporter); + const mockEventService = mockInstance(EventService); beforeEach(() => { jest.resetAllMocks(); @@ -21,6 +23,7 @@ describe('WorkflowIndexService', () => { service = new WorkflowIndexService( mockWorkflowDependencyRepository, mockWorkflowRepository, + mockEventService, mockLogger, mockErrorReporter, ); @@ -240,6 +243,18 @@ describe('WorkflowIndexService', () => { }); }); + describe('init()', () => { + it('should register event listeners for workflow events', () => { + service.init(); + + expect(mockEventService.on).toHaveBeenCalledTimes(4); + expect(mockEventService.on).toHaveBeenCalledWith('server-started', expect.any(Function)); + expect(mockEventService.on).toHaveBeenCalledWith('workflow-created', expect.any(Function)); + expect(mockEventService.on).toHaveBeenCalledWith('workflow-saved', expect.any(Function)); + expect(mockEventService.on).toHaveBeenCalledWith('workflow-deleted', expect.any(Function)); + }); + }); + describe('buildIndex()', () => { it('should retrieve unindexed workflows and update their dependencies', async () => { const workflow1 = createWorkflowEntity([ @@ -282,6 +297,7 @@ describe('WorkflowIndexService', () => { const serviceWithSmallBatch = new WorkflowIndexService( mockWorkflowDependencyRepository, mockWorkflowRepository, + mockEventService, mockLogger, mockErrorReporter, batchSize, diff --git a/packages/cli/src/modules/workflow-index/workflow-index.service.ts b/packages/cli/src/modules/workflow-index/workflow-index.service.ts index b95b4f23649..3c079bd450a 100644 --- a/packages/cli/src/modules/workflow-index/workflow-index.service.ts +++ b/packages/cli/src/modules/workflow-index/workflow-index.service.ts @@ -4,6 +4,8 @@ import { Service } from '@n8n/di'; import { ErrorReporter } from 'n8n-core'; import { ensureError, INode, IWorkflowBase } from 'n8n-workflow'; +import { EventService } from '@/events/event.service'; + // A safety limit to prevent infinite loops in indexing. const LOOP_LIMIT = 1_000_000_000; @@ -20,11 +22,28 @@ export class WorkflowIndexService { constructor( private readonly dependencyRepository: WorkflowDependencyRepository, private readonly workflowRepository: WorkflowRepository, + private readonly eventService: EventService, private readonly logger: Logger, private readonly errorReporter: ErrorReporter, private readonly batchSize = 100, ) {} + init() { + this.eventService.on('server-started', async (): Promise => { + this.logger.info('Building workflow dependency index...'); + await this.buildIndex().catch((e) => this.errorReporter.error(e)); + }); + this.eventService.on('workflow-created', async ({ workflow }) => { + await this.updateIndexFor(workflow); + }); + this.eventService.on('workflow-saved', async ({ workflow }) => { + await this.updateIndexFor(workflow); + }); + this.eventService.on('workflow-deleted', async ({ workflowId }) => { + await this.dependencyRepository.removeDependenciesForWorkflow(workflowId); + }); + } + async buildIndex() { const batchSize = this.batchSize; let processedCount = 0; diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 9de2ea84899..40754ce2ff1 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -1,5 +1,5 @@ import { inDevelopment, inProduction } from '@n8n/backend-common'; -import { SecurityConfig } from '@n8n/config'; +import { DatabaseConfig, SecurityConfig, WorkflowsConfig } from '@n8n/config'; import { Time } from '@n8n/constants'; import type { APIRequest, AuthenticatedRequest } from '@n8n/db'; import { Container, Service } from '@n8n/di'; @@ -278,6 +278,11 @@ export class Server extends AbstractServer { await eventBus.initialize(); Container.get(LogStreamingEventRelay).init(); + // ---------------------------------------- + // Workflow Indexing Setup + // ---------------------------------------- + await this.initializeWorkflowIndexing(); + if (this.endpointPresetCredentials !== '') { // POST endpoint to set preset credentials const overwriteEndpointMiddleware = @@ -488,6 +493,21 @@ export class Server extends AbstractServer { } } + private async initializeWorkflowIndexing() { + if (Container.get(WorkflowsConfig).indexingEnabled) { + if (Container.get(DatabaseConfig).isLegacySqlite) { + this.logger.warn( + 'Workflow indexing is disabled because legacy Sqlite databases are not supported. Please migrate the database to enable workflow indexing.', + ); + return; + } + const { WorkflowIndexService } = await import( + '@/modules/workflow-index/workflow-index.service' + ); + Container.get(WorkflowIndexService).init(); + } + } + protected setupPushServer(): void { const { restEndpoint, server, app } = this; Container.get(Push).setupPushServer(restEndpoint, server, app); diff --git a/packages/cli/src/services/__tests__/import.service.test.ts b/packages/cli/src/services/__tests__/import.service.test.ts index ca64d8d22a7..649fbfe1a9c 100644 --- a/packages/cli/src/services/__tests__/import.service.test.ts +++ b/packages/cli/src/services/__tests__/import.service.test.ts @@ -8,6 +8,8 @@ import type { Cipher } from 'n8n-core'; import { ImportService } from '../import.service'; import type { CredentialsRepository, TagRepository } from '@n8n/db'; import type { ActiveWorkflowManager } from '@/active-workflow-manager'; +import type { WorkflowIndexService } from '@/modules/workflow-index/workflow-index.service'; +import type { DatabaseConfig } from '@n8n/config'; // Mock fs/promises jest.mock('fs/promises'); @@ -38,6 +40,8 @@ describe('ImportService', () => { let mockEntityManager: EntityManager; let mockCipher: Cipher; let mockActiveWorkflowManager: ActiveWorkflowManager; + let mockWorkflowIndexService: WorkflowIndexService; + let mockDatabaseConfig: DatabaseConfig; beforeEach(() => { jest.clearAllMocks(); @@ -49,6 +53,8 @@ describe('ImportService', () => { mockEntityManager = mock(); mockCipher = mock(); mockActiveWorkflowManager = mock(); + mockWorkflowIndexService = mock(); + mockDatabaseConfig = mock(); // Set up cipher mock mockCipher.decrypt = jest.fn((data: string) => data.replace('encrypted:', '')); @@ -94,6 +100,8 @@ describe('ImportService', () => { mockDataSource, mockCipher, mockActiveWorkflowManager, + mockWorkflowIndexService, + mockDatabaseConfig, ); }); diff --git a/packages/cli/src/services/import.service.ts b/packages/cli/src/services/import.service.ts index 010c8d18341..a24da08fcbd 100644 --- a/packages/cli/src/services/import.service.ts +++ b/packages/cli/src/services/import.service.ts @@ -21,6 +21,8 @@ import { Cipher } from 'n8n-core'; import { decompressFolder } from '@/utils/compression.util'; import { z } from 'zod'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; +import { WorkflowIndexService } from '@/modules/workflow-index/workflow-index.service'; +import { DatabaseConfig } from '@n8n/config'; @Service() export class ImportService { @@ -52,6 +54,8 @@ export class ImportService { private readonly dataSource: DataSource, private readonly cipher: Cipher, private readonly activeWorkflowManager: ActiveWorkflowManager, + private readonly workflowIndexService: WorkflowIndexService, + private readonly databaseConfig: DatabaseConfig, ) {} async initRecords() { @@ -79,6 +83,7 @@ export class ImportService { } } + const insertedWorkflows: IWorkflowBase[] = []; const { manager: dbManager } = this.credentialsRepository; await dbManager.transaction(async (tx) => { for (const workflow of workflows) { @@ -92,6 +97,7 @@ export class ImportService { const upsertResult = await tx.upsert(WorkflowEntity, workflow, ['id']); const workflowId = upsertResult.identifiers.at(0)?.id as string; + insertedWorkflows.push({ ...workflow, id: workflowId }); // Collect inserted workflow with correct ID, for indexing later. const personalProject = await tx.findOneByOrFail(Project, { id: projectId }); @@ -116,6 +122,15 @@ export class ImportService { } } }); + + // Directly update the index for the important workflows, since they don't generate + // workflow-update events during import. + // Workflow indexing isn't supported on legacy SQLite. + if (!this.databaseConfig.isLegacySqlite) { + for (const workflow of insertedWorkflows) { + await this.workflowIndexService.updateIndexFor(workflow); + } + } } async replaceInvalidCreds(workflow: IWorkflowBase) { diff --git a/packages/cli/test/integration/import.service.test.ts b/packages/cli/test/integration/import.service.test.ts index c625fd6e874..a7c9b346eab 100644 --- a/packages/cli/test/integration/import.service.test.ts +++ b/packages/cli/test/integration/import.service.test.ts @@ -6,6 +6,7 @@ import { newWorkflow, testDb, } from '@n8n/backend-test-utils'; +import { DatabaseConfig } from '@n8n/config'; import type { Project, User } from '@n8n/db'; import { TagEntity, @@ -20,6 +21,7 @@ import type { INode } from 'n8n-workflow'; import { v4 as uuid } from 'uuid'; import type { ActiveWorkflowManager } from '@/active-workflow-manager'; +import type { WorkflowIndexService } from '@/modules/workflow-index/workflow-index.service'; import { ImportService } from '@/services/import.service'; import { createMember, createOwner } from './shared/db/users'; @@ -30,6 +32,7 @@ describe('ImportService', () => { let owner: User; let ownerPersonalProject: Project; let mockActiveWorkflowManager: ActiveWorkflowManager; + let mockWorkflowIndexService: WorkflowIndexService; beforeAll(async () => { await testDb.init(); @@ -43,6 +46,8 @@ describe('ImportService', () => { mockActiveWorkflowManager = mock(); + mockWorkflowIndexService = mock(); + importService = new ImportService( mock(), credentialsRepository, @@ -50,6 +55,8 @@ describe('ImportService', () => { mock(), mock(), mockActiveWorkflowManager, + mockWorkflowIndexService, + Container.get(DatabaseConfig), ); }); @@ -71,6 +78,11 @@ describe('ImportService', () => { if (!dbWorkflow) fail('Expected to find workflow'); expect(dbWorkflow.id).toBe(workflowToImport.id); + if (Container.get(DatabaseConfig).isLegacySqlite) { + expect(mockWorkflowIndexService.updateIndexFor).not.toHaveBeenCalled(); + } else { + expect(mockWorkflowIndexService.updateIndexFor).toHaveBeenCalledWith(workflowToImport); + } }); test('should make user owner of imported workflow', async () => { diff --git a/packages/cli/test/integration/workflows/workflow-index.test.ts b/packages/cli/test/integration/workflows/workflow-index.test.ts new file mode 100644 index 00000000000..5c54ebb2e2e --- /dev/null +++ b/packages/cli/test/integration/workflows/workflow-index.test.ts @@ -0,0 +1,127 @@ +import { Logger } from '@n8n/backend-common'; +import { testDb } from '@n8n/backend-test-utils'; +import { DatabaseConfig } from '@n8n/config'; +import type { IWorkflowDb } from '@n8n/db'; +import { WorkflowDependencyRepository, WorkflowRepository } from '@n8n/db'; +import { Container } from '@n8n/di'; +import { retryUntil } from '@test-integration/retry-until'; +import { ErrorReporter } from 'n8n-core'; +import { v4 as uuid } from 'uuid'; + +import { createOwner } from '../shared/db/users'; + +import { EventService } from '@/events/event.service'; +import { WorkflowIndexService } from '@/modules/workflow-index/workflow-index.service'; + +let workflowIndexService: WorkflowIndexService; +let eventService: EventService; +let workflowRepository: WorkflowRepository; +let workflowDependencyRepository: WorkflowDependencyRepository; + +beforeAll(async () => { + await testDb.init(); + + // Get real instances from the container + workflowRepository = Container.get(WorkflowRepository); + workflowDependencyRepository = Container.get(WorkflowDependencyRepository); + eventService = Container.get(EventService); + + // Create the WorkflowIndexService with real dependencies + workflowIndexService = new WorkflowIndexService( + workflowDependencyRepository, + workflowRepository, + eventService, + Container.get(Logger), + Container.get(ErrorReporter), + ); + + // Initialize the service to register event listeners + workflowIndexService.init(); +}); + +afterEach(async () => { + await testDb.truncate(['WorkflowEntity', 'WorkflowDependency']); +}); + +afterAll(async () => { + await testDb.terminate(); +}); + +describe('WorkflowIndexService Integration', () => { + if (Container.get(DatabaseConfig).isLegacySqlite) { + // NOTE: this feature isn't supported on legacy SQLite databases, so we skip the tests. + it('is not supported on legacy SQLite databases', async () => {}); + return; + } + + describe('workflow-created event', () => { + it('should index a new workflow with a single node', async () => { + // Arrange + const owner = await createOwner(); + const workflowId = uuid(); + const versionId = uuid(); + + const workflow = { + id: workflowId, + name: 'Test Workflow', + active: false, + versionCounter: 1, + versionId, + nodes: [ + { + id: 'node-1', + name: 'HTTP Request', + type: 'n8n-nodes-base.httpRequest', + typeVersion: 1, + position: [250, 300] as [number, number], + parameters: {}, + }, + ], + connections: {}, + settings: {}, + triggerCount: 0, + isArchived: false, + createdAt: new Date(), + updatedAt: new Date(), + } satisfies IWorkflowDb; + + // Save the workflow to the database + const savedWorkflow = await workflowRepository.save(workflow); + + // Act - emit the workflow-created event + eventService.emit('workflow-created', { + user: { + id: owner.id, + email: owner.email, + firstName: owner.firstName, + lastName: owner.lastName, + role: { slug: owner.role.slug }, + }, + workflow: savedWorkflow, + publicApi: false, + projectId: uuid(), + projectType: 'personal', + }); + + await retryUntil(async () => { + // Assert - check that dependencies were indexed in the database + const dependencies = await workflowDependencyRepository.find({ + where: { workflowId }, + }); + + expect(dependencies).toHaveLength(1); + expect(dependencies[0]).toMatchObject({ + workflowId, + workflowVersionId: 1, + dependencyType: 'nodeType', + dependencyKey: 'n8n-nodes-base.httpRequest', + dependencyInfo: { + nodeId: 'node-1', + nodeVersion: 1, + }, + indexVersionId: 1, + }); + }); + }); + }); +});