""" @file obfuscator.py @brief Core engine for OMG-Fuscator. @details Orchestrates the full obfuscation pipeline: symbol analysis and renaming, string encryption, control-flow flattening, and junk code insertion. Provides debug instrumentation and JSON reporting when enabled. """ import ast import random import json import os from datetime import datetime from transformers.rename import RenameTransformer from transformers.control_flow import ControlFlowFlattener from transformers.class_analyzer import ClassAnalyzer, update_obfuscator_with_class_mappings from transformers.attribute_transformer import AttributeTransformer from transformers.symbol_tree import SymbolTreeBuilder, SymbolTree from utils.name_gen import NameGenerator from utils.junk_gen import JunkGenerator from transformers.class_mapper import apply_class_mapping class AdvancedObfuscator: """ @brief Core obfuscation engine orchestrating all transformations. @details Coordinates name generation, symbol tree construction, AST transformations for renaming and string encryption, control-flow flattening, and final code generation with junk injection. Optionally records detailed debug data. """ def __init__(self, debug_mode=False): self.used_names = set() self.name_generator = NameGenerator() # Generate keys for string encryption self.primary_key = bytes([random.randint(65, 90) for _ in range(16)]) self.secondary_key = bytes([random.randint(65, 90) for _ in range(8)]) self.salt = bytes([random.randint(65, 90) for _ in range(4)]) # Symbol tree for global tracking of all symbols self.symbol_tree = None # Legacy mappings for backward compatibility self.class_attr_mapping = {} self.global_var_renames = {} # Initialize junk generator self.junk_gen = JunkGenerator(self.name_generator) # Debugging flags and data structures self.debug_mode = debug_mode self.debug_data = { "timestamp": datetime.now().isoformat(), "transformations": [], "class_mappings": {}, "variable_mappings": {}, "method_mappings": {}, "string_encryption": [], "issues_detected": [], "control_flow_stats": {}, "junk_stats": {"count": 0, "snippets": []} } def log_debug(self, category, data): """ @brief Append a structured debug entry if debug mode is enabled. @param category Logical stage or component name. @param data Arbitrary JSON-serializable payload to record. """ if self.debug_mode: self.debug_data["transformations"].append({ "stage": category, "data": data, "timestamp": datetime.now().isoformat() }) def detect_issues(self): """ @brief Run integrity checks and record any issues to debug data. @details Aggregates issues from the symbol tree and legacy name-collision checks to help diagnose transformation inconsistencies. """ if not self.debug_mode: return # First check issues in the symbol tree if self.symbol_tree: symbol_tree_issues = self.symbol_tree.check_for_issues() for issue in symbol_tree_issues: self.debug_data["issues_detected"].append(issue) # Legacy checks for backwards compatibility used_renames = set() duplicates = [] for orig, renamed in self.global_var_renames.items(): if renamed in used_renames: duplicates.append(renamed) used_renames.add(renamed) # Check if our renamed values match any original names (could lead to conflicts) if renamed in self.global_var_renames.keys(): self.debug_data["issues_detected"].append({ "type": "name_collision", "info": f"Renamed value '{renamed}' matches an original name" }) if duplicates: self.debug_data["issues_detected"].append({ "type": "duplicate_names", "names": duplicates }) def _build_symbol_tree(self, tree: ast.AST) -> SymbolTree: """ @brief Build a global symbol tree from the parsed AST. @param tree Parsed AST of the input source. @return SymbolTree Populated symbol tree with rename mappings and metadata. """ if self.debug_mode: self.log_debug("symbol_tree_building", "Building global symbol tree") builder = SymbolTreeBuilder() symbol_tree = builder.build_tree(tree) # Apply name generator to all symbols symbol_tree.apply_name_generator(self.name_generator) # Populate legacy mappings for backward compatibility rename_mapping = symbol_tree.get_rename_mapping() # Update class and method mappings for class_name, class_obf_name in rename_mapping["classes"].items(): self.global_var_renames[class_name] = class_obf_name # Initialize class_attr_mapping entry if class_obf_name not in self.class_attr_mapping: self.class_attr_mapping[class_obf_name] = {} # Copy method mappings if class_name in rename_mapping["methods"]: for method_name, method_obf_name in rename_mapping["methods"][class_name].items(): self.class_attr_mapping[class_obf_name][method_name] = method_obf_name # Copy attribute mappings if class_name in rename_mapping["attributes"]: for attr_name, attr_obf_name in rename_mapping["attributes"][class_name].items(): self.class_attr_mapping[class_obf_name][attr_name] = attr_obf_name # Update function and variable mappings self.global_var_renames.update(rename_mapping["functions"]) self.global_var_renames.update(rename_mapping["variables"]) if self.debug_mode: self.log_debug("symbol_tree_stats", { "classes": len(rename_mapping["classes"]), "methods": sum(len(methods) for methods in rename_mapping["methods"].values()), "attributes": sum(len(attrs) for attrs in rename_mapping["attributes"].values()), "functions": len(rename_mapping["functions"]), "variables": len(rename_mapping["variables"]) }) return symbol_tree def _rename_and_encrypt(self, tree: ast.AST) -> ast.AST: """ @brief Rename identifiers and encrypt string literals in the AST. @param tree Input AST prior to control-flow transformations. @return ast.AST Transformed AST with consistent renames and encrypted strings. """ # First, build a comprehensive symbol tree self.symbol_tree = self._build_symbol_tree(tree) # Now perform the main transformation with consistent mappings transformer = RenameTransformer( self.name_generator, self.global_var_renames, self.class_attr_mapping, self.primary_key, self.secondary_key, self.salt, debug_mode=self.debug_mode ) tree = transformer.visit(tree) # Capture debug data from transformer if available if self.debug_mode and hasattr(transformer, 'debug_data'): self.debug_data["variable_mappings"] = transformer.debug_data.get("variable_mappings", {}) self.debug_data["string_encryption"] = transformer.debug_data.get("string_encryption", []) ast.fix_missing_locations(tree) return tree def _flatten_control_flow(self, tree: ast.AST) -> ast.AST: """ @brief Flatten control flow into a state-machine dispatch form. @param tree AST after renaming/encryption. @return ast.AST Transformed AST with flattened control flow. """ flattener = ControlFlowFlattener(debug_mode=self.debug_mode) tree = flattener.visit(tree) # Capture debug data from flattener if available if self.debug_mode and hasattr(flattener, 'debug_data'): self.debug_data["control_flow_stats"] = flattener.debug_data ast.fix_missing_locations(tree) return tree def _generate_final_code(self, tree: ast.AST) -> str: """ @brief Generate final Python source from the AST and inject junk code. @param tree AST after all transformations. @return str Final obfuscated Python source code. """ lines = ast.unparse(tree).split('\n') in_multiline = False skip_for_n = 0 junk_count = 0 junk_snippets = [] def in_try_block(ls, ci): c_line = ls[ci] c_strip = c_line.lstrip() c_indent = len(c_line) - len(c_strip) saw_try = False for idx in range(ci - 1, -1, -1): l = ls[idx] s = l.lstrip() i_amount = len(l) - len(s) if s.startswith(("except ", "finally:")) and i_amount <= c_indent: break if s.startswith("try:") and i_amount <= c_indent: saw_try = True break return saw_try # We add these two lines at the top to ensure random is imported result = ["import random"] for i, line in enumerate(lines): strip = line.lstrip() if '"""' in strip or "'''" in strip: in_multiline = not in_multiline # Conditions to inject junk can_inject = ( not in_multiline and not in_try_block(lines, i) and skip_for_n == 0 and not strip.startswith( ( "def ", "class ", "@", "try:", "except ", "finally:", "import ", "from ", "elif ", "else:", "return", "raise", "pass", "break", "continue" ) ) ) if can_inject: junk = self.junk_gen.generate_junk() result.append(junk) junk_snippets.append(junk) junk_count += 1 skip_for_n = random.randint(5, 15) else: skip_for_n = max(0, skip_for_n - 1) result.append(line) if self.debug_mode: self.debug_data["junk_stats"]["count"] = junk_count self.debug_data["junk_stats"]["snippets"] = junk_snippets return '\n'.join(result) def obfuscate(self, code: str) -> str: """ @brief End-to-end obfuscation entry point. @param code Original Python source code. @return str Obfuscated Python source code. """ tree = ast.parse(code) # Step 1: Rename and encrypt tree = self._rename_and_encrypt(tree) # Step 2: Flatten control flow tree = self._flatten_control_flow(tree) # Step 3: Generate final obfuscated code final_code = self._generate_final_code(tree) if self.debug_mode: debug_output_path = os.path.join("debug", f"debug_{self.debug_data['timestamp']}.json") os.makedirs(os.path.dirname(debug_output_path), exist_ok=True) with open(debug_output_path, "w") as debug_file: json.dump(self.debug_data, debug_file, indent=4) return final_code