mirror of
https://github.com/syssec-utd/pylingual.git
synced 2026-05-11 02:40:13 -07:00
support passing directories to pylingual CLI files list
This commit is contained in:
+55
-11
@@ -55,16 +55,48 @@ def print_result(title: str, results: list[TestResult]):
|
||||
rich.get_console().print(table, justify="center")
|
||||
|
||||
|
||||
def collect_files(paths: list[Path], out_dir: Path, flatten: bool) -> list[tuple[Path, Path]]:
|
||||
file_map: list[tuple[Path, Path]] = []
|
||||
seen_outputs: set[Path] = set()
|
||||
|
||||
def add_file(source: Path, dest: Path):
|
||||
# Resolve collisions by incrementing a counter
|
||||
counter = 1
|
||||
original_stem = dest.stem
|
||||
while dest in seen_outputs:
|
||||
dest = dest.with_stem(f"{original_stem}_{counter}")
|
||||
counter += 1
|
||||
|
||||
file_map.append((source, dest))
|
||||
seen_outputs.add(dest)
|
||||
|
||||
for path in paths:
|
||||
if path.is_file():
|
||||
# individual files are saved directly to the output directory
|
||||
add_file(path, out_dir / f"decompiled_{path.with_suffix('.py').name}")
|
||||
elif path.is_dir():
|
||||
# directories are recursively searched for .pyc files
|
||||
for pyc_path in path.rglob("*.pyc"):
|
||||
target_dir = out_dir
|
||||
if not flatten:
|
||||
# mirror the directory structure
|
||||
target_dir /= pyc_path.relative_to(path).parent
|
||||
add_file(pyc_path, target_dir / f"decompiled_{pyc_path.with_suffix('.py').name}")
|
||||
|
||||
return file_map
|
||||
|
||||
|
||||
@click.command(help="End to end pipeline to decompile Python bytecode into source code.", context_settings={"help_option_names": ["-h", "--help"]})
|
||||
@click.argument("files", nargs=-1)
|
||||
@click.option("-o", "--out-dir", default=None, type=Path, help="The directory to export results to.", metavar="PATH")
|
||||
@click.option("-c", "--config-file", default=None, type=Path, help="Config file for model information.", metavar="PATH")
|
||||
@click.argument("files", type=click.Path(exists=True, path_type=Path), nargs=-1, metavar="PATHS")
|
||||
@click.option("-o", "--out-dir", default=None, type=click.Path(file_okay=False, path_type=Path), help="The directory to export results to.", metavar="PATH")
|
||||
@click.option("-c", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=Path), help="Config file for model information.", metavar="PATH")
|
||||
@click.option("-v", "--version", default=None, type=PythonVersion, help="Python version of the .pyc, default is auto detection.", metavar="VERSION")
|
||||
@click.option("-k", "--top-k", default=10, type=int, help="Maximum number of additional segmentations to consider.", metavar="INT")
|
||||
@click.option("-q", "--quiet", is_flag=True, default=False, help="Suppress console output.")
|
||||
@click.option("--flatten", is_flag=True, default=False, help="Flatten the output directory. (Only used if files list contains directories)")
|
||||
@click.option("--trust-lnotab", is_flag=True, default=False, help="Use the lnotab for segmentation instead of the segmentation model.")
|
||||
@click.option("--init-pyenv", is_flag=True, default=False, help="Install pyenv before decompiling.")
|
||||
def main(files: list[str], out_dir: Path | None, config_file: Path | None, version: PythonVersion | None, top_k: int, trust_lnotab: bool, init_pyenv: bool, quiet: bool):
|
||||
def main(files: list[Path], out_dir: Path | None, config_file: Path | None, version: PythonVersion | None, top_k: int, flatten: bool, trust_lnotab: bool, init_pyenv: bool, quiet: bool):
|
||||
rich.reconfigure(markup=False, emoji=False, quiet=quiet, theme=Theme({"logging.keyword": "yellow not bold"}))
|
||||
console = rich.get_console()
|
||||
log_handler = RichHandler(console=console, rich_tracebacks=True)
|
||||
@@ -101,7 +133,13 @@ def main(files: list[str], out_dir: Path | None, config_file: Path | None, versi
|
||||
# the step is not done until the TrackedList is deleted
|
||||
TrackedList.__del__ = lambda self: progress.advance(self.task.id, 9e999)
|
||||
|
||||
n = len(files)
|
||||
tasks_to_process = collect_files(files, out_dir or Path("."), flatten)
|
||||
n = len(tasks_to_process)
|
||||
|
||||
if n == 0:
|
||||
logger.warning("No pyc files found to process.")
|
||||
return
|
||||
|
||||
with Live(Group(Rule(), status, progress), transient=True, console=console, refresh_per_second=12.5) as live:
|
||||
transformers.logging.disable_default_handler()
|
||||
transformers.logging.add_handler(log_handler)
|
||||
@@ -109,19 +147,24 @@ def main(files: list[str], out_dir: Path | None, config_file: Path | None, versi
|
||||
progress.add_task(TRANSLATION_STEP, start=False)
|
||||
progress.add_task(CFLOW_STEP, start=False)
|
||||
progress.add_task(CORRECTION_STEP, start=False)
|
||||
for i, file in enumerate(files):
|
||||
|
||||
for i, (pyc_path, save_path) in enumerate(tasks_to_process):
|
||||
for task in progress.tasks:
|
||||
progress.reset(task.id, start=False)
|
||||
pyc_path = Path(file)
|
||||
log_handler.keywords = [file, pyc_path.name, pyc_path.with_suffix(".py").name, "decompiled_" + pyc_path.with_suffix(".py").name]
|
||||
|
||||
# Ensure output directory exists (especially for mirrored structure)
|
||||
save_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
log_handler.keywords = [str(pyc_path), pyc_path.name, pyc_path.with_suffix(".py").name, save_path.name]
|
||||
status.update(f"Decompiling {pyc_path} ({i + 1} / {n})")
|
||||
if not pyc_path.exists():
|
||||
raise FileNotFoundError(f"pyc file {pyc_path} does not exist")
|
||||
logger.error(f"pyc file {pyc_path} does not exist")
|
||||
continue
|
||||
|
||||
try:
|
||||
result = decompile(
|
||||
pyc=pyc_path,
|
||||
save_to=Path(f"{out_dir}/decompiled_{pyc_path.with_suffix('.py').name}" if out_dir else f"decompiled_{pyc_path.with_suffix('.py').name}"),
|
||||
save_to=save_path,
|
||||
config_file=Path(config_file) if config_file else None,
|
||||
version=version,
|
||||
top_k=top_k,
|
||||
@@ -130,10 +173,10 @@ def main(files: list[str], out_dir: Path | None, config_file: Path | None, versi
|
||||
pyc = result.original_pyc
|
||||
print_result(f"Equivalence Results for {pyc.pyc_path.name if pyc.pyc_path else repr(pyc)}", result.equivalence_results)
|
||||
except Exception:
|
||||
live.stop()
|
||||
logger.exception(f"Failed to decompile {pyc_path}")
|
||||
console.rule()
|
||||
|
||||
|
||||
def install_pyenv():
|
||||
if shutil.which("pyenv") is not None:
|
||||
logger.warning("pyenv seems to already be installed, ignoring --init-pyenv...")
|
||||
@@ -164,5 +207,6 @@ def install_pyenv():
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user