README.md

4 min read Original article ↗
#!/usr/bin/env python from __future__ import annotations import argparse import json import os import re import subprocess import time import traceback from pathlib import Path def resolve_repo_path(path: str, repo_root: Path) -> Path: candidate = Path(path) return candidate if candidate.is_absolute() else (repo_root / candidate) def get_spec_list(specs_dir: Path) -> list[str]: if not specs_dir.exists(): raise FileNotFoundError(f"Specs directory not found: {specs_dir}") specs: list[str] = [] for path in sorted(specs_dir.glob("*.md")): if path.name in {"README.md", "done.md"}: continue if re.match(r"^\d{4}-.*\.md$", path.name): specs.append(str(path.as_posix())) if not specs: raise ValueError(f"No specs found in {specs_dir}") return specs def build_prompt(spec_path: str, phrase: str) -> str: return ( f"Implement spec: {spec_path}\n\n" "Requirements:\n" "- Read and follow the spec.\n" "- Work in this repo.\n" "- Commit when complete with a clear message.\n" "- Write useful learnings for future runs to AGENTS.md.\n" f"- After committing, print only the magic phrase: {phrase}\n" "- Do not print the magic phrase before the commit.\n" ) def load_done(done_path: Path) -> set[str]: if not done_path.exists(): done_path.parent.mkdir(parents=True, exist_ok=True) done_path.write_text("", encoding="utf-8") return set() done: set[str] = set() for line in done_path.read_text(encoding="utf-8").splitlines(): match = re.match(r"^\s*-\s+(.+)$", line) if match: done.add(match.group(1).strip()) return done def append_done(done_path: Path, spec: str) -> None: with done_path.open("a", encoding="utf-8") as handle: handle.write(f"- {spec}\n") def append_log(log_path: Path, text: str) -> None: log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open("a", encoding="utf-8") as handle: handle.write(text) def run_codex(codex_exe: str, codex_args: list[str], prompt: str) -> tuple[int, str]: process = subprocess.run( [codex_exe, *codex_args, "-"], input=prompt, text=True, capture_output=True, cwd=os.getcwd(), ) output = (process.stdout or "") + (process.stderr or "") return process.returncode, output def parse_reset_seconds(text: str) -> int | None: match = re.search(r'resets_in_seconds"\s*:\s*(\d+)', text) if match: return int(match.group(1)) match = re.search(r'resets_at"\s*:\s*(\d+)', text) if match: reset_epoch = int(match.group(1)) return max(0, reset_epoch - int(time.time())) for line in text.splitlines(): line = line.strip() if not (line.startswith("{") and line.endswith("}")): continue try: payload = json.loads(line) except json.JSONDecodeError: continue if isinstance(payload, dict): reset_seconds = payload.get("resets_in_seconds") if isinstance(reset_seconds, int): return reset_seconds reset_epoch = payload.get("resets_at") if isinstance(reset_epoch, int): return max(0, reset_epoch - int(time.time())) return None def main() -> int: parser = argparse.ArgumentParser(description="Run specs sequentially with Codex.") parser.add_argument("--magic-phrase", default="SPEC_COMPLETE") parser.add_argument("--codex-exe", default="codex") parser.add_argument( "--codex-args", default="exec --dangerously-bypass-approvals-and-sandbox", help="Space-separated codex args, e.g. 'exec --full-auto -m gpt-5.2-codex'", ) parser.add_argument("--specs-dir", default="specs") parser.add_argument("--max-attempts-per-spec", type=int, default=5) parser.add_argument("--log-path", default="specs/agent-run.log") parser.add_argument("--done-path", default="specs/done.md") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() repo_root = Path(__file__).resolve().parent os.chdir(repo_root) specs_dir = resolve_repo_path(args.specs_dir, repo_root) log_path = resolve_repo_path(args.log_path, repo_root) done_path = resolve_repo_path(args.done_path, repo_root) specs = get_spec_list(specs_dir) done_set = load_done(done_path) codex_args = args.codex_args.split() if not shutil_which(args.codex_exe): raise FileNotFoundError(f"Codex executable not found on PATH: {args.codex_exe}") completed_count = 0 skipped_count = 0 failed_count = 0 total_specs = len(specs) for spec in specs: if spec in done_set: skipped_count += 1 print(f"[skip] already done: {spec}") continue attempt = 1 done = False while not done: if attempt > args.max_attempts_per_spec: failed_count += 1 raise RuntimeError(f"Max attempts exceeded for spec: {spec}") prompt = build_prompt(spec, args.magic_phrase) timestamp = time.strftime("%Y-%m-%d %H:%M:%S") append_log(log_path, f"=== {timestamp} | {spec} | attempt {attempt} ===\n") if args.dry_run: print(f"[dry-run] {spec} (attempt {attempt})") break progress_index = completed_count + skipped_count + 1 print(f"[start] Spec {progress_index} of {total_specs} | attempt {attempt} :: {spec}") try: exit_code, output_text = run_codex(args.codex_exe, codex_args, prompt) except Exception: output_text = "[exception] codex invocation failed\n" + traceback.format_exc() append_log(log_path, output_text + ("\n" if not output_text.endswith("\n") else "")) print(f"[error] exception during codex run for {spec}") attempt += 1 continue append_log(log_path, output_text + ("\n" if not output_text.endswith("\n") else "")) usage_limit = ( "usage_limit_reached" in output_text or "Too Many Requests" in output_text or "You've hit your usage limit" in output_text ) if usage_limit: reset_seconds = parse_reset_seconds(output_text) if reset_seconds is None: wait_seconds = 60 * 60 print(f"[wait] usage limit reached; sleeping {wait_seconds} seconds before retry") time.sleep(wait_seconds) attempt += 1 continue wait_seconds = reset_seconds + 30 print(f"[wait] usage limit reached; sleeping {wait_seconds} seconds before retry") time.sleep(wait_seconds) attempt += 1 continue if exit_code != 0: print(f"[error] codex exit code {exit_code} for {spec}") attempt += 1 continue if args.magic_phrase in output_text: done = True completed_count += 1 append_done(done_path, spec) print(f"[done] {spec}") continue print(f"[retry] magic phrase not found for {spec}") attempt += 1 print("=== Summary ===") print(f"Completed: {completed_count}") print(f"Skipped: {skipped_count}") print(f"Failed: {failed_count}") return 0 def shutil_which(executable: str) -> str | None: for path in os.environ.get("PATH", "").split(os.pathsep): candidate = Path(path) / executable if candidate.exists(): return str(candidate) if os.name == "nt": for ext in (".exe", ".cmd", ".bat"): if candidate.with_suffix(ext).exists(): return str(candidate.with_suffix(ext)) return None if __name__ == "__main__": raise SystemExit(main())