mirror of
https://github.com/syssec-utd/pylingual.git
synced 2026-05-10 18:39:03 -07:00
Merge branch 'dev' into cflow-refactor
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import itertools
|
||||
from typing import TYPE_CHECKING
|
||||
from pathlib import Path
|
||||
|
||||
@@ -179,3 +180,16 @@ class CFG(DiGraph_CFT):
|
||||
dot.write(out, prog=["neato", "-n"], format=CFG.graph_format)
|
||||
else:
|
||||
self.iteration_graphs[-1].append(dot.to_string())
|
||||
|
||||
def cdg(self) -> CFG:
|
||||
pdt = nx.create_empty_copy(self)
|
||||
pdt.add_edges_from((B, A) for A, B in nx.immediate_dominators(self.reverse(), self.end).items())
|
||||
pdt.remove_edge(self.end, self.end)
|
||||
pdr = nx.transitive_closure_dag(pdt)
|
||||
postdominates = lambda A, B: pdr.has_edge(A, B) or A == B
|
||||
control_dependent = lambda A, B: 0 < sum(postdominates(A, succ) for succ in self.successors(B)) < self.out_degree(B)
|
||||
cdg = nx.create_empty_copy(self)
|
||||
cdg.add_edges_from((B, A, {"kind": EdgeKind.Fall}) for A, B in itertools.product(self.nodes, self.nodes) if A != B and control_dependent(A, B))
|
||||
cdg.remove_node(self.end)
|
||||
cdg.add_edges_from(((self.start, n) for n in cdg.nodes if cdg.in_degree(n) == 0 and n != self.start), kind=EdgeKind.Fall)
|
||||
return cdg
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import pdb
|
||||
|
||||
from pylingual.editable_bytecode import EditableBytecode
|
||||
from pylingual.editable_bytecode.control_flow_graph import bytecode_to_control_flow_graph
|
||||
|
||||
@@ -27,13 +25,8 @@ def structure_control_flow(cfg: nx.DiGraph, bytecode: EditableBytecode) -> Contr
|
||||
cfg = CFG.from_graph(cfg, bytecode)
|
||||
runs = get_template_runs(bytecode.version[:2])
|
||||
|
||||
try:
|
||||
while len(cfg) > 1:
|
||||
if not iteration(cfg, runs):
|
||||
return MetaTemplate("\x1b[31mirreducible cflow\x1b[0m", bytecode.codeobj)
|
||||
except Exception:
|
||||
if hasattr(pdb, "xpm"):
|
||||
pdb.xpm() # type: ignore
|
||||
raise
|
||||
while len(cfg) > 1:
|
||||
if not iteration(cfg, runs):
|
||||
return MetaTemplate("\x1b[31mirreducible cflow\x1b[0m", bytecode.codeobj)
|
||||
|
||||
return next(iter(cfg.nodes))
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, override
|
||||
|
||||
import networkx as nx
|
||||
|
||||
from ..cft import ControlFlowTemplate, register_template
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pylingual.control_flow_reconstruction.cfg import CFG
|
||||
|
||||
|
||||
# it's better than nothing
|
||||
@register_template(101, 0)
|
||||
class CDG(ControlFlowTemplate):
|
||||
def __init__(self, cfg: CFG):
|
||||
self.cdg = cfg.cdg()
|
||||
self.start = cfg.start
|
||||
self.blame = cfg.start.blame
|
||||
self.header_lines = self.line("# irreducible cflow, using cdg fallback", meta=True)
|
||||
|
||||
@override
|
||||
@classmethod
|
||||
def try_match(cls, cfg: CFG, node: ControlFlowTemplate) -> ControlFlowTemplate | None:
|
||||
cdg = CDG(cfg)
|
||||
|
||||
if cfg.visualize == cfg._visualize:
|
||||
cfg.remove_edges_from(list(cfg.edges))
|
||||
cfg.add_edges_from(cdg.cdg.edges(data=True))
|
||||
cfg.remove_node(cfg.end)
|
||||
cfg.layout_nodes()
|
||||
cfg.visualize()
|
||||
|
||||
cfg.clear()
|
||||
cfg.add_node(cdg)
|
||||
return cdg
|
||||
|
||||
@override
|
||||
def get_instructions(self):
|
||||
return []
|
||||
|
||||
@override
|
||||
def to_indented_source(self, source):
|
||||
cdg = self.cdg
|
||||
for p, n in nx.dfs_edges(cdg, self.start):
|
||||
cdg.nodes[n]["indent"] = cdg.nodes[p].get("indent", -1) + 1
|
||||
cdg.remove_node(self.start)
|
||||
src = []
|
||||
for n in sorted(cdg.nodes, key=lambda x: x.offset):
|
||||
src.extend(source[n, cdg.nodes[n].get("indent", 0)])
|
||||
return src
|
||||
@@ -107,7 +107,7 @@ class TryElse3_12(ControlFlowTemplate):
|
||||
else:
|
||||
{try_else}
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@register_template(0, 0, (3, 11))
|
||||
class Try3_11(ControlFlowTemplate):
|
||||
|
||||
@@ -130,11 +130,7 @@ def main(files: list[str], out_dir: Path | None, config_file: Path | None, versi
|
||||
pyc = result.original_pyc
|
||||
print_result(f"Equivalence Results for {pyc.pyc_path.name if pyc.pyc_path else repr(pyc)}", result.equivalence_results)
|
||||
except Exception:
|
||||
import pdb
|
||||
|
||||
live.stop()
|
||||
if hasattr(pdb, "xpm"):
|
||||
pdb.xpm() # type: ignore
|
||||
logger.exception(f"Failed to decompile {pyc_path}")
|
||||
console.rule()
|
||||
|
||||
|
||||
+6
-4
@@ -306,18 +306,20 @@ def f_nofallthru_if_pass():
|
||||
|
||||
def g1_ifElseLoop():
|
||||
for a in range(3):
|
||||
if a> b:
|
||||
if a > b:
|
||||
print(1)
|
||||
|
||||
|
||||
|
||||
def g2_ifElseLoop():
|
||||
for a in range(3):
|
||||
if a> b:
|
||||
if a > b:
|
||||
print(1)
|
||||
print(2)
|
||||
|
||||
|
||||
def g3_ifElseLoop():
|
||||
for a in range(3):
|
||||
if a> b:
|
||||
if a > b:
|
||||
print(1)
|
||||
else:
|
||||
print(2)
|
||||
|
||||
Reference in New Issue
Block a user