mirror of
https://github.com/syssec-utd/pylingual.git
synced 2026-05-10 18:39:03 -07:00
cflow script
This commit is contained in:
+20
-27
@@ -1,5 +1,5 @@
|
||||
import contextlib
|
||||
import difflib
|
||||
import shutil
|
||||
import json
|
||||
import multiprocessing
|
||||
import os
|
||||
@@ -20,7 +20,7 @@ from pylingual.control_flow_reconstruction.structure import bc_to_cft
|
||||
from pylingual.main import print_result
|
||||
from pylingual.control_flow_reconstruction.source import SourceContext
|
||||
from pylingual.editable_bytecode import PYCFile
|
||||
from pylingual.equivalence_check import compare_pyc
|
||||
from pylingual.equivalence_check import TestResult, compare_pyc
|
||||
from pylingual.utils.version import PythonVersion
|
||||
from pylingual.utils.generate_bytecode import compile_version, CompileError
|
||||
from dataset_generation.normalize_source import normalize_source
|
||||
@@ -73,10 +73,6 @@ def run(file: Path, out_dir: Path, version: PythonVersion, print=False):
|
||||
|
||||
cfts = {bc.codeobj: bc_to_cft(bc) for bc in pyc.iter_bytecodes()}
|
||||
out_src = str(SourceContext(pyc, src_lines, cfts))
|
||||
try:
|
||||
out_src = normalize_source(out_src)
|
||||
except:
|
||||
pass
|
||||
|
||||
out_path = out_dir / "b.py"
|
||||
out_path.write_text(out_src, encoding="utf-8")
|
||||
@@ -85,34 +81,32 @@ def run(file: Path, out_dir: Path, version: PythonVersion, print=False):
|
||||
result = compare_pyc(in_pyc, out_pyc)
|
||||
if print:
|
||||
print_result(f"Equivalance results for {file}", result)
|
||||
return Result.Success if all(x.success for x in result) else Result.Failure, file, out_dir
|
||||
return Result.Success if all(x.success for x in result) else Result.Failure, result, file, out_dir
|
||||
except (CompileError, SyntaxError):
|
||||
return Result.CompileError, file, out_dir
|
||||
return Result.CompileError, [], file, out_dir
|
||||
except Exception:
|
||||
rich.get_console().print_exception()
|
||||
return Result.Error, file, out_dir
|
||||
return Result.Error, [], file, out_dir
|
||||
|
||||
|
||||
class NoPool:
|
||||
imap_unordered = map
|
||||
|
||||
|
||||
def print_diff(a: Path, b: Path):
|
||||
def print_results(a: Path, b: Path, result: Result, results: list[TestResult]):
|
||||
a_text = a.read_text()
|
||||
b_text = b.read_text()
|
||||
a_lines = a_text.split("\n")
|
||||
b_lines = b_text.split("\n")
|
||||
console = rich.console.Console(highlight=False)
|
||||
print(a_text)
|
||||
print("=" * 40)
|
||||
print(b_text)
|
||||
print("=" * 40)
|
||||
line = None
|
||||
for line in difflib.unified_diff(a_lines, b_lines, str(a), str(b)):
|
||||
style = "red" if line[0] == "-" else "green" if line[0] == "+" else "blue" if line[0] == "@" else ""
|
||||
console.print(line, style=style)
|
||||
if line is None:
|
||||
print("equal")
|
||||
console.print("=== original file ===", style='green bold')
|
||||
console.print(a_text)
|
||||
console.print("\n=== reconstructed file ===", style='green bold')
|
||||
console.print(b_text)
|
||||
console.print("\n=== equivalence report ===", style='green bold')
|
||||
if results:
|
||||
for x in results:
|
||||
console.print(str(x), style='' if x.success else 'red bold underline')
|
||||
else:
|
||||
console.print(result, style='red bold underline')
|
||||
|
||||
|
||||
def get_unused(a: Path, _=True):
|
||||
@@ -151,6 +145,7 @@ def main(input: Path, output: str, version: PythonVersion, graph: str | None, pr
|
||||
TextColumn("• [green]Success Rate:{task.fields[srate]:>3.2f}%"), # success rate
|
||||
]
|
||||
if graph:
|
||||
shutil.rmtree(prefix / graph, ignore_errors=True)
|
||||
CFG.enable_graphing(prefix / graph, graph_format)
|
||||
if input.is_file() and input.suffix == ".py":
|
||||
if output:
|
||||
@@ -159,10 +154,8 @@ def main(input: Path, output: str, version: PythonVersion, graph: str | None, pr
|
||||
out = TemporaryDirectory()
|
||||
with out as o:
|
||||
o = Path(o)
|
||||
results = run(input, o, version)[0]
|
||||
if results in [Result.CompileError, Result.Error]:
|
||||
print(results)
|
||||
print_diff(o / input.stem / "a.py", o / input.stem / "b.py")
|
||||
result, eqr, _, _ = run(input, o, version)
|
||||
print_results(o / input.stem / "a.py", o / input.stem / "b.py", result, eqr)
|
||||
else:
|
||||
if not output:
|
||||
out_dir = get_unused(prefix / str(version) / input.stem)
|
||||
@@ -187,7 +180,7 @@ def main(input: Path, output: str, version: PythonVersion, graph: str | None, pr
|
||||
succs = 0
|
||||
processed = 0
|
||||
|
||||
for result, input, od in p.imap_unordered(f, files):
|
||||
for result, _, input, od in p.imap_unordered(f, files):
|
||||
results[result].append(input)
|
||||
processed += 1
|
||||
dir_map[str(input)] = str(od)
|
||||
|
||||
Reference in New Issue
Block a user