Files
Python-Obfuscation/obfuscator.py
T
zack3d 69184c7cb8
Doxygen to Wiki / Build Doxygen and publish to Wiki (push) Failing after 1m0s
da
2025-08-15 21:06:31 -07:00

301 lines
12 KiB
Python

"""
@file obfuscator.py
@brief Core engine for OMG-Fuscator.
@details Orchestrates the full obfuscation pipeline: symbol analysis and renaming,
string encryption, control-flow flattening, and junk code insertion.
Provides debug instrumentation and JSON reporting when enabled.
"""
import ast
import random
import json
import os
from datetime import datetime
from transformers.rename import RenameTransformer
from transformers.control_flow import ControlFlowFlattener
from transformers.class_analyzer import ClassAnalyzer, update_obfuscator_with_class_mappings
from transformers.attribute_transformer import AttributeTransformer
from transformers.symbol_tree import SymbolTreeBuilder, SymbolTree
from utils.name_gen import NameGenerator
from utils.junk_gen import JunkGenerator
from transformers.class_mapper import apply_class_mapping
class AdvancedObfuscator:
"""
@brief Core obfuscation engine orchestrating all transformations.
@details Coordinates name generation, symbol tree construction, AST transformations
for renaming and string encryption, control-flow flattening, and final
code generation with junk injection. Optionally records detailed debug data.
"""
def __init__(self, debug_mode=False):
self.used_names = set()
self.name_generator = NameGenerator()
# Generate keys for string encryption
self.primary_key = bytes([random.randint(65, 90) for _ in range(16)])
self.secondary_key = bytes([random.randint(65, 90) for _ in range(8)])
self.salt = bytes([random.randint(65, 90) for _ in range(4)])
# Symbol tree for global tracking of all symbols
self.symbol_tree = None
# Legacy mappings for backward compatibility
self.class_attr_mapping = {}
self.global_var_renames = {}
# Initialize junk generator
self.junk_gen = JunkGenerator(self.name_generator)
# Debugging flags and data structures
self.debug_mode = debug_mode
self.debug_data = {
"timestamp": datetime.now().isoformat(),
"transformations": [],
"class_mappings": {},
"variable_mappings": {},
"method_mappings": {},
"string_encryption": [],
"issues_detected": [],
"control_flow_stats": {},
"junk_stats": {"count": 0, "snippets": []}
}
def log_debug(self, category, data):
"""
@brief Append a structured debug entry if debug mode is enabled.
@param category Logical stage or component name.
@param data Arbitrary JSON-serializable payload to record.
"""
if self.debug_mode:
self.debug_data["transformations"].append({
"stage": category,
"data": data,
"timestamp": datetime.now().isoformat()
})
def detect_issues(self):
"""
@brief Run integrity checks and record any issues to debug data.
@details Aggregates issues from the symbol tree and legacy name-collision
checks to help diagnose transformation inconsistencies.
"""
if not self.debug_mode:
return
# First check issues in the symbol tree
if self.symbol_tree:
symbol_tree_issues = self.symbol_tree.check_for_issues()
for issue in symbol_tree_issues:
self.debug_data["issues_detected"].append(issue)
# Legacy checks for backwards compatibility
used_renames = set()
duplicates = []
for orig, renamed in self.global_var_renames.items():
if renamed in used_renames:
duplicates.append(renamed)
used_renames.add(renamed)
# Check if our renamed values match any original names (could lead to conflicts)
if renamed in self.global_var_renames.keys():
self.debug_data["issues_detected"].append({
"type": "name_collision",
"info": f"Renamed value '{renamed}' matches an original name"
})
if duplicates:
self.debug_data["issues_detected"].append({
"type": "duplicate_names",
"names": duplicates
})
def _build_symbol_tree(self, tree: ast.AST) -> SymbolTree:
"""
@brief Build a global symbol tree from the parsed AST.
@param tree Parsed AST of the input source.
@return SymbolTree Populated symbol tree with rename mappings and metadata.
"""
if self.debug_mode:
self.log_debug("symbol_tree_building", "Building global symbol tree")
builder = SymbolTreeBuilder()
symbol_tree = builder.build_tree(tree)
# Apply name generator to all symbols
symbol_tree.apply_name_generator(self.name_generator)
# Populate legacy mappings for backward compatibility
rename_mapping = symbol_tree.get_rename_mapping()
# Update class and method mappings
for class_name, class_obf_name in rename_mapping["classes"].items():
self.global_var_renames[class_name] = class_obf_name
# Initialize class_attr_mapping entry
if class_obf_name not in self.class_attr_mapping:
self.class_attr_mapping[class_obf_name] = {}
# Copy method mappings
if class_name in rename_mapping["methods"]:
for method_name, method_obf_name in rename_mapping["methods"][class_name].items():
self.class_attr_mapping[class_obf_name][method_name] = method_obf_name
# Copy attribute mappings
if class_name in rename_mapping["attributes"]:
for attr_name, attr_obf_name in rename_mapping["attributes"][class_name].items():
self.class_attr_mapping[class_obf_name][attr_name] = attr_obf_name
# Update function and variable mappings
self.global_var_renames.update(rename_mapping["functions"])
self.global_var_renames.update(rename_mapping["variables"])
if self.debug_mode:
self.log_debug("symbol_tree_stats", {
"classes": len(rename_mapping["classes"]),
"methods": sum(len(methods) for methods in rename_mapping["methods"].values()),
"attributes": sum(len(attrs) for attrs in rename_mapping["attributes"].values()),
"functions": len(rename_mapping["functions"]),
"variables": len(rename_mapping["variables"])
})
return symbol_tree
def _rename_and_encrypt(self, tree: ast.AST) -> ast.AST:
"""
@brief Rename identifiers and encrypt string literals in the AST.
@param tree Input AST prior to control-flow transformations.
@return ast.AST Transformed AST with consistent renames and encrypted strings.
"""
# First, build a comprehensive symbol tree
self.symbol_tree = self._build_symbol_tree(tree)
# Now perform the main transformation with consistent mappings
transformer = RenameTransformer(
self.name_generator,
self.global_var_renames,
self.class_attr_mapping,
self.primary_key,
self.secondary_key,
self.salt,
debug_mode=self.debug_mode
)
tree = transformer.visit(tree)
# Capture debug data from transformer if available
if self.debug_mode and hasattr(transformer, 'debug_data'):
self.debug_data["variable_mappings"] = transformer.debug_data.get("variable_mappings", {})
self.debug_data["string_encryption"] = transformer.debug_data.get("string_encryption", [])
ast.fix_missing_locations(tree)
return tree
def _flatten_control_flow(self, tree: ast.AST) -> ast.AST:
"""
@brief Flatten control flow into a state-machine dispatch form.
@param tree AST after renaming/encryption.
@return ast.AST Transformed AST with flattened control flow.
"""
flattener = ControlFlowFlattener(debug_mode=self.debug_mode)
tree = flattener.visit(tree)
# Capture debug data from flattener if available
if self.debug_mode and hasattr(flattener, 'debug_data'):
self.debug_data["control_flow_stats"] = flattener.debug_data
ast.fix_missing_locations(tree)
return tree
def _generate_final_code(self, tree: ast.AST) -> str:
"""
@brief Generate final Python source from the AST and inject junk code.
@param tree AST after all transformations.
@return str Final obfuscated Python source code.
"""
lines = ast.unparse(tree).split('\n')
in_multiline = False
skip_for_n = 0
junk_count = 0
junk_snippets = []
def in_try_block(ls, ci):
c_line = ls[ci]
c_strip = c_line.lstrip()
c_indent = len(c_line) - len(c_strip)
saw_try = False
for idx in range(ci - 1, -1, -1):
l = ls[idx]
s = l.lstrip()
i_amount = len(l) - len(s)
if s.startswith(("except ", "finally:")) and i_amount <= c_indent:
break
if s.startswith("try:") and i_amount <= c_indent:
saw_try = True
break
return saw_try
# We add these two lines at the top to ensure random is imported
result = ["import random"]
for i, line in enumerate(lines):
strip = line.lstrip()
if '"""' in strip or "'''" in strip:
in_multiline = not in_multiline
# Conditions to inject junk
can_inject = (
not in_multiline
and not in_try_block(lines, i)
and skip_for_n == 0
and not strip.startswith(
(
"def ", "class ", "@", "try:", "except ", "finally:",
"import ", "from ", "elif ", "else:", "return", "raise",
"pass", "break", "continue"
)
)
)
if can_inject:
junk = self.junk_gen.generate_junk()
result.append(junk)
junk_snippets.append(junk)
junk_count += 1
skip_for_n = random.randint(5, 15)
else:
skip_for_n = max(0, skip_for_n - 1)
result.append(line)
if self.debug_mode:
self.debug_data["junk_stats"]["count"] = junk_count
self.debug_data["junk_stats"]["snippets"] = junk_snippets
return '\n'.join(result)
def obfuscate(self, code: str) -> str:
"""
@brief End-to-end obfuscation entry point.
@param code Original Python source code.
@return str Obfuscated Python source code.
"""
tree = ast.parse(code)
# Step 1: Rename and encrypt
tree = self._rename_and_encrypt(tree)
# Step 2: Flatten control flow
tree = self._flatten_control_flow(tree)
# Step 3: Generate final obfuscated code
final_code = self._generate_final_code(tree)
if self.debug_mode:
debug_output_path = os.path.join("debug", f"debug_{self.debug_data['timestamp']}.json")
os.makedirs(os.path.dirname(debug_output_path), exist_ok=True)
with open(debug_output_path, "w") as debug_file:
json.dump(self.debug_data, debug_file, indent=4)
return final_code