n8n/packages/@n8n/task-runner-python/tests/unit/test_task_analyzer.py
n8n-assistant[bot] 85b7796434
chore: Bundle 2.x (#28844)
Co-authored-by: Matsu <matias.huhta@n8n.io>
Co-authored-by: Dawid Myslak <dawid.myslak@gmail.com>
Co-authored-by: Bernhard Wittmann <bernhard.wittmann@n8n.io>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Dimitri Lavrenük <20122620+dlavrenuek@users.noreply.github.com>
Co-authored-by: Benjamin Schroth <68321970+schrothbn@users.noreply.github.com>
Co-authored-by: Danny Martini <danny@n8n.io>
Co-authored-by: RomanDavydchuk <roman.davydchuk@n8n.io>
Co-authored-by: Sandra Zollner <sandra.zollner@n8n.io>
Co-authored-by: Milorad FIlipović <milorad@n8n.io>
Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
2026-04-22 08:51:32 +03:00

516 lines
17 KiB
Python

import pytest
from src.errors.security_violation_error import SecurityViolationError
from src.task_analyzer import TaskAnalyzer
from src.config.security_config import SecurityConfig
from src.constants import BLOCKED_ATTRIBUTES, BLOCKED_NAMES
class TestTaskAnalyzer:
@pytest.fixture
def analyzer(self) -> TaskAnalyzer:
security_config = SecurityConfig(
stdlib_allow={
"json",
"math",
"re",
"datetime",
"random",
"string",
"collections",
"itertools",
"functools",
"operator",
},
external_allow=set(),
builtins_deny=set(),
runner_env_deny=True,
)
return TaskAnalyzer(security_config)
class TestImportValidation(TestTaskAnalyzer):
def test_allowed_standard_imports(self, analyzer: TaskAnalyzer) -> None:
valid_imports = [
"import json",
"import math",
"from datetime import datetime",
"from collections import Counter",
"import re as regex",
"from itertools import chain, cycle",
"from math import *",
]
for code in valid_imports:
analyzer.validate(code)
def test_blocked_dangerous_imports(self, analyzer: TaskAnalyzer) -> None:
dangerous_imports = [
"import os",
"import sys",
"import subprocess",
"from os import path",
"import socket",
]
for code in dangerous_imports:
with pytest.raises(SecurityViolationError):
analyzer.validate(code)
def test_blocked_relative_imports(self, analyzer: TaskAnalyzer) -> None:
relative_imports = [
"from . import module",
"from .. import parent_module",
"from ...package import something",
]
for code in relative_imports:
with pytest.raises(SecurityViolationError):
analyzer.validate(code)
class TestAttributeAccessValidation(TestTaskAnalyzer):
def test_all_blocked_attributes_are_blocked(self, analyzer: TaskAnalyzer) -> None:
for attr in BLOCKED_ATTRIBUTES:
code = f"obj.{attr}"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert attr in exc_info.value.description.lower()
def test_all_blocked_names_are_blocked(self, analyzer: TaskAnalyzer) -> None:
for name in BLOCKED_NAMES:
code = f"{name}"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert name in exc_info.value.description
def test_loader_access_attempts_blocked(self, analyzer: TaskAnalyzer) -> None:
exploit_attempts = [
"__loader__.load_module('posix')",
"posix = __loader__.load_module('posix'); posix.system('echo')",
"module = __loader__.load_module('os')",
]
for code in exploit_attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__loader__" in exc_info.value.description
def test_spec_access_attempts_blocked(self, analyzer: TaskAnalyzer) -> None:
exploit_attempts = [
"__spec__.loader().load_module('posix')",
"posix = __spec__.loader().load_module('posix')",
"__spec__",
"loader = __spec__.loader()",
]
for code in exploit_attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__spec__" in exc_info.value.description
def test_dunder_name_attempts_blocked(self, analyzer: TaskAnalyzer) -> None:
exploit_attempts = [
"sys.modules[__name__]",
"builtins_module = sys.modules[__name__]",
"sys.modules[__name__].open('/etc/passwd', 'r')",
"builtins_module = sys.modules[__name__]; unfiltered_open = builtins_module.open",
]
for code in exploit_attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__name__" in exc_info.value.description
def test_allowed_attribute_access(self, analyzer: TaskAnalyzer) -> None:
allowed_attributes = [
"obj.value",
"data.items()",
"list.append(item)",
"dict.keys()",
"str.upper()",
"math.pi",
]
for code in allowed_attributes:
analyzer.validate(code)
def test_name_mangled_attributes_blocked(self, analyzer: TaskAnalyzer) -> None:
exploit_attempts = [
"license._Printer__filenames",
"obj._SomeClass__private_attr",
"help._Helper__name",
"credits._Printer__data",
"instance._MyClass__secret",
]
for code in exploit_attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "name-mangled" in exc_info.value.description.lower()
def test_objclass_attribute_blocked(self, analyzer: TaskAnalyzer) -> None:
exploit_attempts = [
"str.__or__.__objclass__",
"str.__init__.__objclass__",
"type_ref = str.__or__.__objclass__",
"object_ref = str.__init__.__objclass__",
]
for code in exploit_attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__objclass__" in exc_info.value.description
def test_attribute_error_obj_blocked(self, analyzer: TaskAnalyzer) -> None:
exploit_attempts = [
"e.obj",
"exception.obj",
"error.obj",
]
for code in exploit_attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "obj" in exc_info.value.description
class TestDynamicImportDetection(TestTaskAnalyzer):
def test_various_dynamic_import_patterns(self, analyzer: TaskAnalyzer) -> None:
disallowed_dynamic_imports = [
"__import__('os')",
"import builtins; builtins.__import__('sys')",
"module_name = 'subprocess'; __import__(module_name)",
]
for code in disallowed_dynamic_imports:
with pytest.raises(SecurityViolationError):
analyzer.validate(code)
def test_allowed_modules_via_dynamic_import(self, analyzer: TaskAnalyzer) -> None:
allowed_dynamic_imports = [
"__import__('json')",
"__import__('math')",
"__import__('collections')",
"module = __import__('datetime')",
]
for code in allowed_dynamic_imports:
analyzer.validate(code)
class TestFormatStringAttacks(TestTaskAnalyzer):
def test_dangerous_format_patterns_blocked(self, analyzer: TaskAnalyzer) -> None:
dangerous_strings = [
# Attribute access patterns
'"{.__builtins__}".format(print)',
'"{.__class__}".format(obj)',
'"{.__globals__}".format(fn)',
'"{.__class__.__mro__}".format(obj)',
# Subscript access patterns
'"{.__builtins__[__import__]}".format(print)',
'"{[__import__]}".format(__builtins__)',
'fmt = "{.__class__}"',
'fmt = "{.__builtins__}"; fmt.format(obj)',
]
for code in dangerous_strings:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "disallowed" in exc_info.value.description.lower()
def test_safe_format_strings_allowed(self, analyzer: TaskAnalyzer) -> None:
safe_format_strings = [
'"Hello {}".format(name)',
'"{0} {1}".format(a, b)',
'"{name}".format(name="world")',
'"{.value}".format(obj)',
'"{[key]}".format(data)',
'"{:.2f}".format(3.14159)',
]
for code in safe_format_strings:
analyzer.validate(code)
def test_escaped_braces_allowed(self, analyzer: TaskAnalyzer) -> None:
safe_escaped = [
'"{{.__class__}}".format()',
'"{{.__builtins__}}".format()',
'"{{.__globals__}}".format()',
]
for code in safe_escaped:
analyzer.validate(code)
def test_fstring_blocked_attributes_detected(self, analyzer: TaskAnalyzer) -> None:
exploit_attempts = [
'f"{obj.__class__}"',
'f"{fn.__globals__}"',
]
for code in exploit_attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "disallowed" in exc_info.value.description.lower()
class TestMatchPatternValidation(TestTaskAnalyzer):
def test_match_pattern_with_blocked_attributes_blocked(
self, analyzer: TaskAnalyzer
) -> None:
attempts = [
"""
ex = None
try:
pass
except Exception as e:
ex = e
match ex:
case AttributeError(obj=rip):
pass
""",
"""
match error:
case ValueError(obj=x):
pass
""",
"""
match e:
case Exception(__traceback__=tb):
pass
""",
]
for code in attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "disallowed" in exc_info.value.description.lower()
def test_positional_match_patterns_blocked(self, analyzer: TaskAnalyzer) -> None:
attempts = [
"""
match error:
case ValueError(x):
pass
""",
"""
match obj:
case SomeClass(a, b):
pass
""",
]
for code in attempts:
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "positional" in exc_info.value.description.lower()
def test_safe_match_patterns_allowed(self, analyzer: TaskAnalyzer) -> None:
safe_patterns = [
"""
match value:
case 1:
pass
case "hello":
pass
""",
"""
match point:
case Point(x=x, y=y):
pass
""",
"""
match data:
case {"key": value}:
pass
""",
"""
match result:
case [first, *rest]:
pass
""",
]
for code in safe_patterns:
analyzer.validate(code)
class TestGlobalStatementValidation(TestTaskAnalyzer):
def test_global_builtins_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "global __builtins__"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__builtins__" in exc_info.value.description
def test_global_other_blocked_names(self, analyzer: TaskAnalyzer) -> None:
for name in BLOCKED_NAMES:
code = f"global {name}"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert name in exc_info.value.description
def test_global_safe_names_allowed(self, analyzer: TaskAnalyzer) -> None:
safe = ["global x", "global my_var", "global counter"]
for code in safe:
analyzer.validate(code)
class TestFunctionDefNameValidation(TestTaskAnalyzer):
def test_funcdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "def __builtins__(): pass"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__builtins__" in exc_info.value.description
def test_decorated_funcdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None:
code = """\
@(lambda f: {})
def __builtins__(): pass
"""
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__builtins__" in exc_info.value.description
def test_async_funcdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "async def __builtins__(): pass"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__builtins__" in exc_info.value.description
def test_funcdef_other_blocked_names(self, analyzer: TaskAnalyzer) -> None:
for name in BLOCKED_NAMES:
code = f"def {name}(): pass"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert name in exc_info.value.description
def test_async_funcdef_other_blocked_names(self, analyzer: TaskAnalyzer) -> None:
for name in BLOCKED_NAMES:
code = f"async def {name}(): pass"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert name in exc_info.value.description
def test_funcdef_safe_names_allowed(self, analyzer: TaskAnalyzer) -> None:
safe = [
"def my_func(): pass",
"def helper(): pass",
"async def fetch(): pass",
]
for code in safe:
analyzer.validate(code)
class TestClassDefNameValidation(TestTaskAnalyzer):
def test_classdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "class __builtins__: pass"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__builtins__" in exc_info.value.description
def test_decorated_classdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None:
code = """\
@(lambda c: c)
class __builtins__: pass
"""
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert "__builtins__" in exc_info.value.description
def test_classdef_other_blocked_names(self, analyzer: TaskAnalyzer) -> None:
for name in BLOCKED_NAMES:
code = f"class {name}: pass"
with pytest.raises(SecurityViolationError) as exc_info:
analyzer.validate(code)
assert name in exc_info.value.description
def test_classdef_safe_names_allowed(self, analyzer: TaskAnalyzer) -> None:
safe = [
"class MyClass: pass",
"class Helper: pass",
"class Foo(Bar): pass",
]
for code in safe:
analyzer.validate(code)
class TestReduceBlocked(TestTaskAnalyzer):
def test_reduce_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "obj.__reduce__()"
with pytest.raises(SecurityViolationError):
analyzer.validate(code)
def test_reduce_ex_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "obj.__reduce_ex__(2)"
with pytest.raises(SecurityViolationError):
analyzer.validate(code)
def test_prepare_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "obj.__prepare__()"
with pytest.raises(SecurityViolationError):
analyzer.validate(code)
def test_instancecheck_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "obj.__instancecheck__(x)"
with pytest.raises(SecurityViolationError):
analyzer.validate(code)
def test_match_args_blocked(self, analyzer: TaskAnalyzer) -> None:
code = "obj.__match_args__"
with pytest.raises(SecurityViolationError):
analyzer.validate(code)
class TestFullExploitChainBlocked(TestTaskAnalyzer):
def test_full_poc_blocked(self, analyzer: TaskAnalyzer) -> None:
"""The split-string literals intentionally sidestep the constant-string
check; rejection comes from visit_MatchClass at the positional pattern."""
poc = """\
_int = int
_repr = repr
global __builtins__
@(lambda f: {"getattr": lambda o, n, *d: None})
def __builtins__(): pass
_obj = _int.__init__.__reduce__()[1][0]
_type = _int.__prepare__.__reduce__()[1][0]
ic = "__instancech" + "eck__"
M = _type("M", (_type,), {ic: lambda c, i: True})
ma = "__match_ar" + "gs__"
sa = "__se" + "lf__"
E = M("E", (_obj,), {ma: (sa,)})
bm = None
match _repr:
case E(bm):
pass
"""
with pytest.raises(SecurityViolationError):
analyzer.validate(poc)
class TestAllowAll(TestTaskAnalyzer):
def test_allow_all_bypasses_validation(self) -> None:
security_config = SecurityConfig(
stdlib_allow={"*"},
external_allow={"*"},
builtins_deny=set(),
runner_env_deny=True,
)
analyzer = TaskAnalyzer(security_config)
unsafe_allowed_code = [
"import os",
"import sys",
"__import__('subprocess')",
"obj.__subclasses__",
"from . import relative",
]
for code in unsafe_allowed_code:
analyzer.validate(code)