mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-12 16:10:30 +02:00
fix(core): Defer requirements check for Python runner in internal mode (#22448)
This commit is contained in:
parent
344c90940e
commit
97d8b39326
|
|
@ -51,6 +51,8 @@ interface ExecuteFunctionObject {
|
|||
[name: string]: ((...args: unknown[]) => unknown) | ExecuteFunctionObject;
|
||||
}
|
||||
|
||||
export type RunnerStatus = { available: true } | { available: false; reason?: string };
|
||||
|
||||
@Service()
|
||||
export abstract class TaskRequester {
|
||||
requestAcceptRejects: Map<string, { accept: RequestAccept; reject: RequestReject }> = new Map();
|
||||
|
|
@ -63,6 +65,8 @@ export abstract class TaskRequester {
|
|||
|
||||
private readonly executionIdsToTaskIds: Map<string, Set<string>> = new Map();
|
||||
|
||||
private readonly unavailableRunners: Map<string, string> = new Map();
|
||||
|
||||
constructor(
|
||||
private readonly nodeTypes: NodeTypes,
|
||||
private readonly eventService: EventService,
|
||||
|
|
@ -71,6 +75,15 @@ export abstract class TaskRequester {
|
|||
private readonly errorReporter: ErrorReporter,
|
||||
) {}
|
||||
|
||||
setRunnerUnavailable(taskType: string, reason: string) {
|
||||
this.unavailableRunners.set(taskType, reason);
|
||||
}
|
||||
|
||||
getRunnerStatus(taskType: string): RunnerStatus {
|
||||
const reason = this.unavailableRunners.get(taskType);
|
||||
return reason ? { available: false, reason } : { available: true };
|
||||
}
|
||||
|
||||
async startTask<TData, TError>(
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
taskType: string,
|
||||
|
|
|
|||
|
|
@ -14,8 +14,10 @@ import type { PyTaskRunnerProcess } from '@/task-runners/task-runner-process-py'
|
|||
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
|
||||
|
||||
import { MissingAuthTokenError } from './errors/missing-auth-token.error';
|
||||
import { MissingRequirementsError } from './errors/missing-requirements.error';
|
||||
import type { TaskBrokerServer } from './task-broker/task-broker-server';
|
||||
import type { LocalTaskRequester } from './task-managers/local-task-requester';
|
||||
import { TaskRequester } from './task-managers/task-requester';
|
||||
|
||||
/**
|
||||
* Module responsible for loading and starting task runner. Task runner can be
|
||||
|
|
@ -126,6 +128,15 @@ export class TaskRunnerModule {
|
|||
|
||||
if (this.runnerConfig.isNativePythonRunnerEnabled) {
|
||||
const { PyTaskRunnerProcess } = await import('@/task-runners/task-runner-process-py');
|
||||
|
||||
const failureReason = await PyTaskRunnerProcess.checkRequirements();
|
||||
if (failureReason) {
|
||||
Container.get(TaskRequester).setRunnerUnavailable('python', failureReason);
|
||||
const error = new MissingRequirementsError(failureReason);
|
||||
this.logger.warn(error.message);
|
||||
return; // allow bootup, will fail at execution time
|
||||
}
|
||||
|
||||
this.pyRunnerProcess = Container.get(PyTaskRunnerProcess);
|
||||
this.pyRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector(
|
||||
this.pyRunnerProcess,
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import { access } from 'node:fs/promises';
|
|||
import path from 'node:path';
|
||||
import { promisify } from 'node:util';
|
||||
|
||||
import { MissingRequirementsError } from './errors/missing-requirements.error';
|
||||
import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service';
|
||||
import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events';
|
||||
import { TaskRunnerProcessBase } from './task-runner-process-base';
|
||||
|
|
@ -33,20 +32,8 @@ export class PyTaskRunnerProcess extends TaskRunnerProcessBase {
|
|||
}
|
||||
|
||||
async startProcess(grantToken: string, taskBrokerUri: string) {
|
||||
try {
|
||||
await asyncExec('python3 --version', { timeout: 5000 });
|
||||
} catch {
|
||||
throw new MissingRequirementsError('python');
|
||||
}
|
||||
|
||||
const pythonDir = path.join(__dirname, '../../../@n8n/task-runner-python');
|
||||
const venvPath = path.join(pythonDir, '.venv/bin/python');
|
||||
|
||||
try {
|
||||
await access(venvPath);
|
||||
} catch {
|
||||
throw new MissingRequirementsError('venv');
|
||||
}
|
||||
const venvPath = PyTaskRunnerProcess.getVenvPath();
|
||||
|
||||
return spawn(venvPath, ['-m', 'src.main'], {
|
||||
cwd: pythonDir,
|
||||
|
|
@ -65,4 +52,29 @@ export class PyTaskRunnerProcess extends TaskRunnerProcessBase {
|
|||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Python requirements are met for internal mode.
|
||||
* Returns the failure reason if requirements are missing, or `null` if all requirements are met.
|
||||
*/
|
||||
static async checkRequirements(): Promise<'python' | 'venv' | null> {
|
||||
try {
|
||||
await asyncExec('python3 --version', { timeout: 5000 });
|
||||
} catch {
|
||||
return 'python';
|
||||
}
|
||||
|
||||
try {
|
||||
await access(PyTaskRunnerProcess.getVenvPath());
|
||||
} catch {
|
||||
return 'venv';
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static getVenvPath() {
|
||||
const pythonDir = path.join(__dirname, '../../../@n8n/task-runner-python');
|
||||
return path.join(pythonDir, '.venv/bin/python');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -459,6 +459,7 @@ export async function getBase({
|
|||
},
|
||||
logAiEvent: (eventName: keyof AiEventMap, payload: AiEventPayload) =>
|
||||
eventService.emit(eventName, payload),
|
||||
getRunnerStatus: (taskType: string) => Container.get(TaskRequester).getRunnerStatus(taskType),
|
||||
};
|
||||
|
||||
for (const [moduleName, moduleContext] of Container.get(ModuleRegistry).context.entries()) {
|
||||
|
|
|
|||
|
|
@ -274,4 +274,8 @@ export class BaseExecuteContext extends NodeExecutionContext {
|
|||
this.executeData,
|
||||
);
|
||||
}
|
||||
|
||||
getRunnerStatus(taskType: string): { available: true } | { available: false; reason?: string } {
|
||||
return this.additionalData.getRunnerStatus?.(taskType) ?? { available: true };
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import { pythonCodeDescription } from './descriptions/PythonCodeDescription';
|
|||
import { JavaScriptSandbox } from './JavaScriptSandbox';
|
||||
import { JsTaskRunnerSandbox } from './JsTaskRunnerSandbox';
|
||||
import { NativePythonWithoutRunnerError } from './native-python-without-runner.error';
|
||||
import { PythonRunnerUnavailableError } from './python-runner-unavailable.error';
|
||||
import { PythonSandbox } from './PythonSandbox';
|
||||
import { PythonTaskRunnerSandbox } from './PythonTaskRunnerSandbox';
|
||||
import { getSandboxContext } from './Sandbox';
|
||||
|
|
@ -148,8 +149,17 @@ export class Code implements INodeType {
|
|||
: [await sandbox.runCodeForEachItem(numInputItems)];
|
||||
}
|
||||
|
||||
if (language === 'pythonNative' && !isPyRunner) {
|
||||
throw new NativePythonWithoutRunnerError();
|
||||
if (language === 'pythonNative') {
|
||||
if (!isPyRunner) {
|
||||
throw new NativePythonWithoutRunnerError();
|
||||
}
|
||||
|
||||
const runnerStatus = this.getRunnerStatus('python');
|
||||
if (!runnerStatus.available) {
|
||||
throw new PythonRunnerUnavailableError(
|
||||
runnerStatus.reason as 'python' | 'venv' | undefined,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (isPyLang && isPyRunner) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,21 @@
|
|||
import { UserError } from 'n8n-workflow';
|
||||
|
||||
type FailureReason = 'python' | 'venv';
|
||||
|
||||
const REASONS: Record<FailureReason, string> = {
|
||||
python: 'Python 3 is missing from this system',
|
||||
venv: 'Virtual environment is missing from this system',
|
||||
};
|
||||
|
||||
export class PythonRunnerUnavailableError extends UserError {
|
||||
constructor(reason?: FailureReason) {
|
||||
const message = reason
|
||||
? `Python runner unavailable: ${REASONS[reason]}`
|
||||
: 'Python runner unavailable';
|
||||
|
||||
super(message, {
|
||||
description:
|
||||
'Internal mode is intended only for debugging. For production, deploy in external mode: https://docs.n8n.io/hosting/configuration/task-runners/#setting-up-external-mode',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -1052,6 +1052,8 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
|
|||
settings: unknown,
|
||||
itemIndex: number,
|
||||
): Promise<Result<T, E>>;
|
||||
|
||||
getRunnerStatus(taskType: string): { available: true } | { available: false; reason?: string };
|
||||
};
|
||||
|
||||
export interface IExecuteSingleFunctions extends BaseExecutionFunctions {
|
||||
|
|
@ -2712,6 +2714,7 @@ export interface IWorkflowExecuteAdditionalData {
|
|||
envProviderState: EnvProviderState,
|
||||
executeData?: IExecuteData,
|
||||
): Promise<Result<T, E>>;
|
||||
getRunnerStatus?(taskType: string): { available: true } | { available: false; reason?: string };
|
||||
}
|
||||
|
||||
export type WorkflowActivateMode =
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user