diff --git a/pylingual/main.py b/pylingual/main.py index c450419..4f5a3c5 100644 --- a/pylingual/main.py +++ b/pylingual/main.py @@ -55,16 +55,48 @@ def print_result(title: str, results: list[TestResult]): rich.get_console().print(table, justify="center") +def collect_files(paths: list[Path], out_dir: Path, flatten: bool) -> list[tuple[Path, Path]]: + file_map: list[tuple[Path, Path]] = [] + seen_outputs: set[Path] = set() + + def add_file(source: Path, dest: Path): + # Resolve collisions by incrementing a counter + counter = 1 + original_stem = dest.stem + while dest in seen_outputs: + dest = dest.with_stem(f"{original_stem}_{counter}") + counter += 1 + + file_map.append((source, dest)) + seen_outputs.add(dest) + + for path in paths: + if path.is_file(): + # individual files are saved directly to the output directory + add_file(path, out_dir / f"decompiled_{path.with_suffix('.py').name}") + elif path.is_dir(): + # directories are recursively searched for .pyc files + for pyc_path in path.rglob("*.pyc"): + target_dir = out_dir + if not flatten: + # mirror the directory structure + target_dir /= pyc_path.relative_to(path).parent + add_file(pyc_path, target_dir / f"decompiled_{pyc_path.with_suffix('.py').name}") + + return file_map + + @click.command(help="End to end pipeline to decompile Python bytecode into source code.", context_settings={"help_option_names": ["-h", "--help"]}) -@click.argument("files", nargs=-1) -@click.option("-o", "--out-dir", default=None, type=Path, help="The directory to export results to.", metavar="PATH") -@click.option("-c", "--config-file", default=None, type=Path, help="Config file for model information.", metavar="PATH") +@click.argument("files", type=click.Path(exists=True, path_type=Path), nargs=-1, metavar="PATHS") +@click.option("-o", "--out-dir", default=None, type=click.Path(file_okay=False, path_type=Path), help="The directory to export results to.", metavar="PATH") +@click.option("-c", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=Path), help="Config file for model information.", metavar="PATH") @click.option("-v", "--version", default=None, type=PythonVersion, help="Python version of the .pyc, default is auto detection.", metavar="VERSION") @click.option("-k", "--top-k", default=10, type=int, help="Maximum number of additional segmentations to consider.", metavar="INT") @click.option("-q", "--quiet", is_flag=True, default=False, help="Suppress console output.") +@click.option("--flatten", is_flag=True, default=False, help="Flatten the output directory. (Only used if files list contains directories)") @click.option("--trust-lnotab", is_flag=True, default=False, help="Use the lnotab for segmentation instead of the segmentation model.") @click.option("--init-pyenv", is_flag=True, default=False, help="Install pyenv before decompiling.") -def main(files: list[str], out_dir: Path | None, config_file: Path | None, version: PythonVersion | None, top_k: int, trust_lnotab: bool, init_pyenv: bool, quiet: bool): +def main(files: list[Path], out_dir: Path | None, config_file: Path | None, version: PythonVersion | None, top_k: int, flatten: bool, trust_lnotab: bool, init_pyenv: bool, quiet: bool): rich.reconfigure(markup=False, emoji=False, quiet=quiet, theme=Theme({"logging.keyword": "yellow not bold"})) console = rich.get_console() log_handler = RichHandler(console=console, rich_tracebacks=True) @@ -101,7 +133,13 @@ def main(files: list[str], out_dir: Path | None, config_file: Path | None, versi # the step is not done until the TrackedList is deleted TrackedList.__del__ = lambda self: progress.advance(self.task.id, 9e999) - n = len(files) + tasks_to_process = collect_files(files, out_dir or Path("."), flatten) + n = len(tasks_to_process) + + if n == 0: + logger.warning("No pyc files found to process.") + return + with Live(Group(Rule(), status, progress), transient=True, console=console, refresh_per_second=12.5) as live: transformers.logging.disable_default_handler() transformers.logging.add_handler(log_handler) @@ -109,19 +147,24 @@ def main(files: list[str], out_dir: Path | None, config_file: Path | None, versi progress.add_task(TRANSLATION_STEP, start=False) progress.add_task(CFLOW_STEP, start=False) progress.add_task(CORRECTION_STEP, start=False) - for i, file in enumerate(files): + + for i, (pyc_path, save_path) in enumerate(tasks_to_process): for task in progress.tasks: progress.reset(task.id, start=False) - pyc_path = Path(file) - log_handler.keywords = [file, pyc_path.name, pyc_path.with_suffix(".py").name, "decompiled_" + pyc_path.with_suffix(".py").name] + + # Ensure output directory exists (especially for mirrored structure) + save_path.parent.mkdir(parents=True, exist_ok=True) + + log_handler.keywords = [str(pyc_path), pyc_path.name, pyc_path.with_suffix(".py").name, save_path.name] status.update(f"Decompiling {pyc_path} ({i + 1} / {n})") if not pyc_path.exists(): - raise FileNotFoundError(f"pyc file {pyc_path} does not exist") + logger.error(f"pyc file {pyc_path} does not exist") + continue try: result = decompile( pyc=pyc_path, - save_to=Path(f"{out_dir}/decompiled_{pyc_path.with_suffix('.py').name}" if out_dir else f"decompiled_{pyc_path.with_suffix('.py').name}"), + save_to=save_path, config_file=Path(config_file) if config_file else None, version=version, top_k=top_k, @@ -130,10 +173,10 @@ def main(files: list[str], out_dir: Path | None, config_file: Path | None, versi pyc = result.original_pyc print_result(f"Equivalence Results for {pyc.pyc_path.name if pyc.pyc_path else repr(pyc)}", result.equivalence_results) except Exception: - live.stop() logger.exception(f"Failed to decompile {pyc_path}") console.rule() + def install_pyenv(): if shutil.which("pyenv") is not None: logger.warning("pyenv seems to already be installed, ignoring --init-pyenv...") @@ -164,5 +207,6 @@ def install_pyenv(): return False return True + if __name__ == "__main__": main()