import pytest from src.errors.security_violation_error import SecurityViolationError from src.task_analyzer import TaskAnalyzer from src.config.security_config import SecurityConfig from src.constants import BLOCKED_ATTRIBUTES, BLOCKED_NAMES class TestTaskAnalyzer: @pytest.fixture def analyzer(self) -> TaskAnalyzer: security_config = SecurityConfig( stdlib_allow={ "json", "math", "re", "datetime", "random", "string", "collections", "itertools", "functools", "operator", }, external_allow=set(), builtins_deny=set(), runner_env_deny=True, ) return TaskAnalyzer(security_config) class TestImportValidation(TestTaskAnalyzer): def test_allowed_standard_imports(self, analyzer: TaskAnalyzer) -> None: valid_imports = [ "import json", "import math", "from datetime import datetime", "from collections import Counter", "import re as regex", "from itertools import chain, cycle", "from math import *", ] for code in valid_imports: analyzer.validate(code) def test_blocked_dangerous_imports(self, analyzer: TaskAnalyzer) -> None: dangerous_imports = [ "import os", "import sys", "import subprocess", "from os import path", "import socket", ] for code in dangerous_imports: with pytest.raises(SecurityViolationError): analyzer.validate(code) def test_blocked_relative_imports(self, analyzer: TaskAnalyzer) -> None: relative_imports = [ "from . import module", "from .. import parent_module", "from ...package import something", ] for code in relative_imports: with pytest.raises(SecurityViolationError): analyzer.validate(code) class TestAttributeAccessValidation(TestTaskAnalyzer): def test_all_blocked_attributes_are_blocked(self, analyzer: TaskAnalyzer) -> None: for attr in BLOCKED_ATTRIBUTES: code = f"obj.{attr}" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert attr in exc_info.value.description.lower() def test_all_blocked_names_are_blocked(self, analyzer: TaskAnalyzer) -> None: for name in BLOCKED_NAMES: code = f"{name}" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert name in exc_info.value.description def test_loader_access_attempts_blocked(self, analyzer: TaskAnalyzer) -> None: exploit_attempts = [ "__loader__.load_module('posix')", "posix = __loader__.load_module('posix'); posix.system('echo')", "module = __loader__.load_module('os')", ] for code in exploit_attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__loader__" in exc_info.value.description def test_spec_access_attempts_blocked(self, analyzer: TaskAnalyzer) -> None: exploit_attempts = [ "__spec__.loader().load_module('posix')", "posix = __spec__.loader().load_module('posix')", "__spec__", "loader = __spec__.loader()", ] for code in exploit_attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__spec__" in exc_info.value.description def test_dunder_name_attempts_blocked(self, analyzer: TaskAnalyzer) -> None: exploit_attempts = [ "sys.modules[__name__]", "builtins_module = sys.modules[__name__]", "sys.modules[__name__].open('/etc/passwd', 'r')", "builtins_module = sys.modules[__name__]; unfiltered_open = builtins_module.open", ] for code in exploit_attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__name__" in exc_info.value.description def test_allowed_attribute_access(self, analyzer: TaskAnalyzer) -> None: allowed_attributes = [ "obj.value", "data.items()", "list.append(item)", "dict.keys()", "str.upper()", "math.pi", ] for code in allowed_attributes: analyzer.validate(code) def test_name_mangled_attributes_blocked(self, analyzer: TaskAnalyzer) -> None: exploit_attempts = [ "license._Printer__filenames", "obj._SomeClass__private_attr", "help._Helper__name", "credits._Printer__data", "instance._MyClass__secret", ] for code in exploit_attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "name-mangled" in exc_info.value.description.lower() def test_objclass_attribute_blocked(self, analyzer: TaskAnalyzer) -> None: exploit_attempts = [ "str.__or__.__objclass__", "str.__init__.__objclass__", "type_ref = str.__or__.__objclass__", "object_ref = str.__init__.__objclass__", ] for code in exploit_attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__objclass__" in exc_info.value.description def test_attribute_error_obj_blocked(self, analyzer: TaskAnalyzer) -> None: exploit_attempts = [ "e.obj", "exception.obj", "error.obj", ] for code in exploit_attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "obj" in exc_info.value.description class TestDynamicImportDetection(TestTaskAnalyzer): def test_various_dynamic_import_patterns(self, analyzer: TaskAnalyzer) -> None: disallowed_dynamic_imports = [ "__import__('os')", "import builtins; builtins.__import__('sys')", "module_name = 'subprocess'; __import__(module_name)", ] for code in disallowed_dynamic_imports: with pytest.raises(SecurityViolationError): analyzer.validate(code) def test_allowed_modules_via_dynamic_import(self, analyzer: TaskAnalyzer) -> None: allowed_dynamic_imports = [ "__import__('json')", "__import__('math')", "__import__('collections')", "module = __import__('datetime')", ] for code in allowed_dynamic_imports: analyzer.validate(code) class TestFormatStringAttacks(TestTaskAnalyzer): def test_dangerous_format_patterns_blocked(self, analyzer: TaskAnalyzer) -> None: dangerous_strings = [ # Attribute access patterns '"{.__builtins__}".format(print)', '"{.__class__}".format(obj)', '"{.__globals__}".format(fn)', '"{.__class__.__mro__}".format(obj)', # Subscript access patterns '"{.__builtins__[__import__]}".format(print)', '"{[__import__]}".format(__builtins__)', 'fmt = "{.__class__}"', 'fmt = "{.__builtins__}"; fmt.format(obj)', ] for code in dangerous_strings: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "disallowed" in exc_info.value.description.lower() def test_safe_format_strings_allowed(self, analyzer: TaskAnalyzer) -> None: safe_format_strings = [ '"Hello {}".format(name)', '"{0} {1}".format(a, b)', '"{name}".format(name="world")', '"{.value}".format(obj)', '"{[key]}".format(data)', '"{:.2f}".format(3.14159)', ] for code in safe_format_strings: analyzer.validate(code) def test_escaped_braces_allowed(self, analyzer: TaskAnalyzer) -> None: safe_escaped = [ '"{{.__class__}}".format()', '"{{.__builtins__}}".format()', '"{{.__globals__}}".format()', ] for code in safe_escaped: analyzer.validate(code) def test_fstring_blocked_attributes_detected(self, analyzer: TaskAnalyzer) -> None: exploit_attempts = [ 'f"{obj.__class__}"', 'f"{fn.__globals__}"', ] for code in exploit_attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "disallowed" in exc_info.value.description.lower() class TestMatchPatternValidation(TestTaskAnalyzer): def test_match_pattern_with_blocked_attributes_blocked( self, analyzer: TaskAnalyzer ) -> None: attempts = [ """ ex = None try: pass except Exception as e: ex = e match ex: case AttributeError(obj=rip): pass """, """ match error: case ValueError(obj=x): pass """, """ match e: case Exception(__traceback__=tb): pass """, ] for code in attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "disallowed" in exc_info.value.description.lower() def test_positional_match_patterns_blocked(self, analyzer: TaskAnalyzer) -> None: attempts = [ """ match error: case ValueError(x): pass """, """ match obj: case SomeClass(a, b): pass """, ] for code in attempts: with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "positional" in exc_info.value.description.lower() def test_safe_match_patterns_allowed(self, analyzer: TaskAnalyzer) -> None: safe_patterns = [ """ match value: case 1: pass case "hello": pass """, """ match point: case Point(x=x, y=y): pass """, """ match data: case {"key": value}: pass """, """ match result: case [first, *rest]: pass """, ] for code in safe_patterns: analyzer.validate(code) class TestGlobalStatementValidation(TestTaskAnalyzer): def test_global_builtins_blocked(self, analyzer: TaskAnalyzer) -> None: code = "global __builtins__" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__builtins__" in exc_info.value.description def test_global_other_blocked_names(self, analyzer: TaskAnalyzer) -> None: for name in BLOCKED_NAMES: code = f"global {name}" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert name in exc_info.value.description def test_global_safe_names_allowed(self, analyzer: TaskAnalyzer) -> None: safe = ["global x", "global my_var", "global counter"] for code in safe: analyzer.validate(code) class TestFunctionDefNameValidation(TestTaskAnalyzer): def test_funcdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None: code = "def __builtins__(): pass" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__builtins__" in exc_info.value.description def test_decorated_funcdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None: code = """\ @(lambda f: {}) def __builtins__(): pass """ with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__builtins__" in exc_info.value.description def test_async_funcdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None: code = "async def __builtins__(): pass" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__builtins__" in exc_info.value.description def test_funcdef_other_blocked_names(self, analyzer: TaskAnalyzer) -> None: for name in BLOCKED_NAMES: code = f"def {name}(): pass" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert name in exc_info.value.description def test_async_funcdef_other_blocked_names(self, analyzer: TaskAnalyzer) -> None: for name in BLOCKED_NAMES: code = f"async def {name}(): pass" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert name in exc_info.value.description def test_funcdef_safe_names_allowed(self, analyzer: TaskAnalyzer) -> None: safe = [ "def my_func(): pass", "def helper(): pass", "async def fetch(): pass", ] for code in safe: analyzer.validate(code) class TestClassDefNameValidation(TestTaskAnalyzer): def test_classdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None: code = "class __builtins__: pass" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__builtins__" in exc_info.value.description def test_decorated_classdef_builtins_blocked(self, analyzer: TaskAnalyzer) -> None: code = """\ @(lambda c: c) class __builtins__: pass """ with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert "__builtins__" in exc_info.value.description def test_classdef_other_blocked_names(self, analyzer: TaskAnalyzer) -> None: for name in BLOCKED_NAMES: code = f"class {name}: pass" with pytest.raises(SecurityViolationError) as exc_info: analyzer.validate(code) assert name in exc_info.value.description def test_classdef_safe_names_allowed(self, analyzer: TaskAnalyzer) -> None: safe = [ "class MyClass: pass", "class Helper: pass", "class Foo(Bar): pass", ] for code in safe: analyzer.validate(code) class TestReduceBlocked(TestTaskAnalyzer): def test_reduce_blocked(self, analyzer: TaskAnalyzer) -> None: code = "obj.__reduce__()" with pytest.raises(SecurityViolationError): analyzer.validate(code) def test_reduce_ex_blocked(self, analyzer: TaskAnalyzer) -> None: code = "obj.__reduce_ex__(2)" with pytest.raises(SecurityViolationError): analyzer.validate(code) def test_prepare_blocked(self, analyzer: TaskAnalyzer) -> None: code = "obj.__prepare__()" with pytest.raises(SecurityViolationError): analyzer.validate(code) def test_instancecheck_blocked(self, analyzer: TaskAnalyzer) -> None: code = "obj.__instancecheck__(x)" with pytest.raises(SecurityViolationError): analyzer.validate(code) def test_match_args_blocked(self, analyzer: TaskAnalyzer) -> None: code = "obj.__match_args__" with pytest.raises(SecurityViolationError): analyzer.validate(code) class TestFullExploitChainBlocked(TestTaskAnalyzer): def test_full_poc_blocked(self, analyzer: TaskAnalyzer) -> None: """The split-string literals intentionally sidestep the constant-string check; rejection comes from visit_MatchClass at the positional pattern.""" poc = """\ _int = int _repr = repr global __builtins__ @(lambda f: {"getattr": lambda o, n, *d: None}) def __builtins__(): pass _obj = _int.__init__.__reduce__()[1][0] _type = _int.__prepare__.__reduce__()[1][0] ic = "__instancech" + "eck__" M = _type("M", (_type,), {ic: lambda c, i: True}) ma = "__match_ar" + "gs__" sa = "__se" + "lf__" E = M("E", (_obj,), {ma: (sa,)}) bm = None match _repr: case E(bm): pass """ with pytest.raises(SecurityViolationError): analyzer.validate(poc) class TestAllowAll(TestTaskAnalyzer): def test_allow_all_bypasses_validation(self) -> None: security_config = SecurityConfig( stdlib_allow={"*"}, external_allow={"*"}, builtins_deny=set(), runner_env_deny=True, ) analyzer = TaskAnalyzer(security_config) unsafe_allowed_code = [ "import os", "import sys", "__import__('subprocess')", "obj.__subclasses__", "from . import relative", ] for code in unsafe_allowed_code: analyzer.validate(code)