n8n/packages/testing/containers/stack.ts
Declan Carroll 16a36186f2
ci: Tighten n8n testcontainer wait strategy and add sequential service start (#29352)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 11:36:03 +00:00

382 lines
12 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import getPort from 'get-port';
import type { StartedNetwork, StartedTestContainer, StoppedTestContainer } from 'testcontainers';
import { Network } from 'testcontainers';
import { createElapsedLogger, pollContainerHttpEndpoint } from './helpers/utils';
import { waitForNetworkQuiet } from './network-stabilization';
import type { LoadBalancerResult } from './services/load-balancer';
import { createN8NInstances } from './services/n8n';
import { helperFactories, services } from './services/registry';
import type {
FileToMount,
HelperContext,
HelperFactories,
Service,
ServiceHelpers,
ServiceName,
ServiceResult,
StackConfig,
StartContext,
} from './services/types';
import { createTelemetryRecorder } from './telemetry';
const SERVICE_REGISTRY: Record<ServiceName, Service> = services;
export type N8NConfig = StackConfig;
export interface N8NStack {
baseUrl: string;
projectName: string;
stop: () => Promise<void>;
containers: StartedTestContainer[];
serviceResults: Partial<Record<ServiceName, ServiceResult>>;
services: ServiceHelpers;
logs: ServiceHelpers['observability']['logs'];
metrics: ServiceHelpers['observability']['metrics'];
findContainers: (namePattern: string | RegExp) => StartedTestContainer[];
stopContainer: (namePattern: string | RegExp) => Promise<StoppedTestContainer | null>;
/** Direct URLs to each main instance (bypasses load balancer). Index 0 = main-1, etc. */
mainUrls: string[];
}
function shouldServiceStart(name: ServiceName, service: Service, ctx: StartContext): boolean {
// Explicitly requested services always start
if (ctx.config.services?.includes(name)) return true;
if (service.shouldStart) {
return service.shouldStart(ctx);
}
return false;
}
function groupByDependencyLevel(serviceNames: ServiceName[]): ServiceName[][] {
const levels: ServiceName[][] = [];
const assigned = new Set<ServiceName>();
while (assigned.size < serviceNames.length) {
const currentLevel: ServiceName[] = [];
for (const name of serviceNames) {
if (assigned.has(name)) continue;
const service = SERVICE_REGISTRY[name];
const deps = service?.dependsOn ?? [];
if (deps.every((dep) => !serviceNames.includes(dep) || assigned.has(dep))) {
currentLevel.push(name);
}
}
if (currentLevel.length === 0) {
throw new Error('Circular dependency detected in services');
}
levels.push(currentLevel);
currentLevel.forEach((name) => assigned.add(name));
}
return levels;
}
export async function createN8NStack(config: N8NConfig = {}): Promise<N8NStack> {
const {
mains = 1,
workers = 0,
postgres: usePostgresConfig = false,
env = {},
projectName,
resourceQuota,
workerResourceQuota,
services: enabledServices = [],
external = false,
} = config;
const log = createElapsedLogger('stack');
const isQueueMode = mains > 1 || workers > 0;
const needsLoadBalancer = mains > 1;
const usePostgres = usePostgresConfig || isQueueMode || enabledServices.includes('keycloak');
const uniqueProjectName = projectName ?? `n8n-stack-${Math.random().toString(36).substring(7)}`;
let allocatedMainPort: number | undefined;
let allocatedLbPort: number | undefined;
if (needsLoadBalancer) {
allocatedLbPort = await getPort();
} else {
allocatedMainPort = await getPort();
}
const containers: StartedTestContainer[] = [];
const serviceResults: Record<string, ServiceResult> = {};
let environment: Record<string, string> = {};
log(`Starting: ${uniqueProjectName}`);
const telemetry = createTelemetryRecorder(config);
let network: StartedNetwork;
try {
const networkStart = performance.now();
network = await new Network().start();
telemetry.recordNetwork(Math.round(performance.now() - networkStart));
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
telemetry.flush(false, `Network creation failed: ${message}`);
throw error;
}
try {
const ctx: StartContext = {
config: { ...config, postgres: usePostgres },
projectName: uniqueProjectName,
mains,
workers,
isQueueMode,
usePostgres,
needsLoadBalancer,
external,
environment,
serviceResults,
allocatedPorts: {
main: allocatedMainPort,
loadBalancer: allocatedLbPort,
},
};
// Step 1: Start services sequentially within each dependency level.
// Sequential is intentional: parallel start on 2-vCPU CI runners thrashes CPU during
// each container's JIT/init spike and pushes services past their startup timeouts.
// Local benchmarks showed individual containers booting 2-4× faster sequentially under
// contention, with only modest wall-clock cost on uncontended hardware.
const allServiceNames = Object.keys(SERVICE_REGISTRY) as ServiceName[];
const servicesToStart = allServiceNames.filter((name) =>
shouldServiceStart(name, SERVICE_REGISTRY[name], ctx),
);
const dependencyLevels = groupByDependencyLevel(servicesToStart);
const startService = async (name: ServiceName) => {
const service = SERVICE_REGISTRY[name];
const options = service.getOptions?.(ctx);
const serviceStart = performance.now();
try {
const result = await service.start(network, uniqueProjectName, options, ctx);
telemetry.recordService(name, Math.round(performance.now() - serviceStart));
return { name, service, result };
} catch (error) {
telemetry.recordService(name, Math.round(performance.now() - serviceStart));
const message = error instanceof Error ? error.message : String(error);
throw new Error(`Service "${service.description}" (${name}) failed to start: ${message}`);
}
};
for (const level of dependencyLevels) {
const levelNames = level.map((name) => SERVICE_REGISTRY[name].description).join(', ');
const results: Array<Awaited<ReturnType<typeof startService>>> = [];
for (const name of level) {
results.push(await startService(name));
}
for (const { name, service, result } of results) {
// Some services (e.g., tracing) return multiple containers
const serviceContainers =
'containers' in result && Array.isArray(result.containers)
? (result.containers as StartedTestContainer[])
: [result.container];
containers.push(...serviceContainers);
serviceResults[name] = result;
if (service.env) {
environment = { ...environment, ...service.env(result) };
}
if (service.extraEnv) {
environment = { ...environment, ...service.extraEnv(result) };
}
}
ctx.environment = environment;
ctx.serviceResults = serviceResults;
log(`Services ready: ${levelNames}`);
}
// Step 2: Start n8n (main 1 first for DB setup, then rest in parallel)
const lbResult = serviceResults.loadBalancer as LoadBalancerResult | undefined;
const baseUrl = lbResult?.meta.baseUrl ?? `http://localhost:${allocatedMainPort}`;
const filesToMount: FileToMount[] = Object.values(serviceResults).flatMap((result) => {
const meta = result.meta as { n8nFilesToMount?: FileToMount[] } | undefined;
return meta?.n8nFilesToMount ?? [];
});
const n8nStartupStart = performance.now();
const n8nResult = await createN8NInstances({
mains,
workers,
projectName: uniqueProjectName,
network,
serviceEnvironment: environment,
userEnvironment: env,
usePostgres,
baseUrl: needsLoadBalancer ? undefined : baseUrl,
allocatedPort: needsLoadBalancer ? undefined : allocatedMainPort,
resourceQuota,
workerResourceQuota,
filesToMount,
});
containers.push(...n8nResult.containers);
telemetry.recordN8nStartup(
Math.round(performance.now() - n8nStartupStart),
n8nResult.containers.length,
);
log(`n8n ready: ${mains} main(s), ${workers} worker(s)`);
if (lbResult) {
await pollContainerHttpEndpoint(lbResult.container, '/healthz/readiness');
log('Load balancer ready');
}
ctx.baseUrl = baseUrl;
// Build direct main URLs (bypassing load balancer)
const mainUrls: string[] = [];
for (let i = 1; i <= mains; i++) {
const mainNamePattern = mains > 1 ? `-n8n-main-${i}` : '-n8n';
const mainContainer = containers.find((c) => c.getName().endsWith(mainNamePattern));
if (mainContainer) {
const mainPort = mainContainer.getMappedPort(5678);
mainUrls.push(`http://localhost:${mainPort}`);
}
}
log(`Direct main URLs: ${mainUrls.join(', ')}`);
// Run verification hooks (e.g. keycloak connectivity check)
const n8nContainers = containers.filter((c) => {
const name = c.getName();
return name.includes('-n8n-main-') || name.endsWith('-n8n');
});
const verifications: string[] = [];
for (const name of servicesToStart) {
const service = SERVICE_REGISTRY[name];
if (service.verifyFromN8n && serviceResults[name]) {
await service.verifyFromN8n(serviceResults[name], n8nContainers);
verifications.push(service.description);
}
}
if (verifications.length > 0) {
log(`Verified: ${verifications.join(', ')}`);
}
await waitForNetworkQuiet();
telemetry.flush(true);
const helperCtx: HelperContext = {
containers,
findContainer: (pattern: RegExp) => containers.find((c) => pattern.test(c.getName())),
serviceResults,
};
const helperCache: Partial<ServiceHelpers> = {};
const servicesProxy = new Proxy({} as ServiceHelpers, {
get: <K extends keyof ServiceHelpers>(
_target: ServiceHelpers,
prop: K,
): ServiceHelpers[K] => {
if (prop in helperCache) {
return helperCache[prop]!;
}
const factory = (helperFactories as HelperFactories)[prop];
if (!factory) {
throw new Error(
`No helper factory found for service: ${String(prop)}. ` +
`Available helpers: ${Object.keys(helperFactories).join(', ')}`,
);
}
const helper = factory(helperCtx);
helperCache[prop] = helper;
return helper;
},
has: (_target, prop) => prop in helperFactories,
ownKeys: () => Object.keys(helperFactories),
getOwnPropertyDescriptor: (_target, prop) => {
if (prop in helperFactories) {
return { enumerable: true, configurable: true };
}
return undefined;
},
});
return {
baseUrl,
projectName: uniqueProjectName,
stop: async () => await stopN8NStack(containers, network, uniqueProjectName),
containers,
serviceResults,
services: servicesProxy,
get logs() {
return servicesProxy.observability.logs;
},
get metrics() {
return servicesProxy.observability.metrics;
},
findContainers(namePattern: string | RegExp): StartedTestContainer[] {
const regex = typeof namePattern === 'string' ? new RegExp(namePattern) : namePattern;
return containers.filter((container) => regex.test(container.getName()));
},
async stopContainer(namePattern: string | RegExp): Promise<StoppedTestContainer | null> {
const regex = typeof namePattern === 'string' ? new RegExp(namePattern) : namePattern;
const container = containers.find((c) => regex.test(c.getName()));
return container ? await container.stop() : null;
},
mainUrls,
};
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
telemetry.flush(false, message);
throw error;
}
}
function getErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
async function stopN8NStack(
containers: StartedTestContainer[],
network: StartedNetwork,
uniqueProjectName: string,
): Promise<void> {
const errors: Error[] = [];
try {
const stopPromises = containers.reverse().map(async (container) => {
try {
await container.stop();
} catch (error) {
errors.push(
new Error(`Failed to stop container ${container.getId()}: ${getErrorMessage(error)}`),
);
}
});
await Promise.allSettled(stopPromises);
try {
await network.stop();
} catch (error) {
errors.push(
new Error(`Failed to stop network ${network.getName()}: ${getErrorMessage(error)}`),
);
}
if (errors.length > 0) {
console.warn(
`Some cleanup operations failed for stack ${uniqueProjectName}:`,
errors.map((e) => e.message).join(', '),
);
}
} catch (error) {
console.error(`Critical error during cleanup for stack ${uniqueProjectName}:`, error);
throw error;
}
}