AI agents retry. Side effects shouldn't.
Idempotent tool execution for AI agents
Safe retries, crash recovery, and multi-worker coordination.
Idempotency + replay • same tool + args (+ scope) → same effect → return recorded result
Intent-bound approvals • approve the exact tool call + args (no “approve X, run Y”)
Queryable receipts • every side effect recorded (inputs, outputs, timing, status)
Framework-agnostic: wrap any async Python tool handler. Examples: LangGraph / OpenAI / CrewAI / OpenAI Agents SDK.
Table of Contents
- Interactive Demo (2 Minutes)
- Why This Exists
- Installation
- Quick Start
- The "Pause Button" for Your Agent (Human-in-the-loop)
- With LangChain
- How It Works
- More Examples
- When to Use This
- Why Not...?
- FAQ
- Core API
- License
Interactive Demo (2 Minutes)
If you want to feel the behavior before integrating, run the official interactive demo:
git clone https://github.com/rune0-dev/agent-ledger-demo cd agent-ledger-demo python3 -m venv .venv source .venv/bin/activate python -m pip install -e . export OPENAI_API_KEY='sk-...' agent
Try this sequence:
charge customer Acme 100k usd(approval requested + effect created)- Approve with
ywhen prompted - Repeat the exact same request (dedup replay, no duplicate charge)
/effects(inspect recorded effects and dedup count)
See the full demo source in agent-ledger-demo.
Why This Exists
LLMs are non-deterministic. Your APIs aren't.
When an agent crashes or retries, it doesn't remember what it already did:
- A tool call times out → the agent retries → the side effect happens twice (charge/email/ticket).
- The run crashes mid-step → you can’t tell which tools executed (no receipts).
- Multiple workers pick up the same task → duplicate external actions (at-least-once).
- A human approves one action → the agent runs a different one (arg drift).
agent-ledger sits between your agent and the outside world:
| Problem | How agent-ledger helps |
|---|---|
| Agent retried → API called twice | Idempotency: same tool + args (+ scope) → same effect → recorded result replayed, handler skipped |
| "What did the agent actually do?" | Audit trail: every call recorded with inputs, outputs, timing |
| Two workers hit the same task | Concurrency: first writer wins, others wait for the recorded result |
| Process crashed mid-execution | Crash recovery: new process reads the ledger, resumes safely |
| Agent deployed without permission | Approvals: sensitive tools pause until a human approves |
Installation
# To use in dev-mode with MemoryStore pip install agent-ledger # To use in production-mode with PostgresDB pip install agent-ledger[postgres]
Quick Start
import asyncio from psycopg_pool import AsyncConnectionPool from agent_ledger import EffectLedger, EffectLedgerOptions, ToolCall from agent_ledger.stores.postgres import PostgresStore, SCHEMA_SQL async def charge_customer(effect): print(f"Charging {effect.args_canonical}...") return {"status": "charged", "id": "ch_123"} async def main(): # Connect to Postgres pool = AsyncConnectionPool(conninfo="postgresql://localhost/mydb") async with pool.connection() as conn: await conn.execute(SCHEMA_SQL) # Create table if needed store = PostgresStore(pool=pool) ledger = EffectLedger(EffectLedgerOptions(store=store)) # First call: executes the handler result = await ledger.run( ToolCall( workflow_id="order-123", tool="stripe.charge", args={"amount": 1000, "currency": "usd"}, ), handler=charge_customer, ) print(f"First call: {result}") # Second call: same inputs → returns recorded result (handler not called) result2 = await ledger.run( ToolCall( workflow_id="order-123", tool="stripe.charge", args={"amount": 1000, "currency": "usd"}, ), handler=charge_customer, ) print(f"Second call: {result2}") # Handler only executed once. No double charge. asyncio.run(main()) # For quick prototyping without Postgres: # from agent_ledger import MemoryStore # store = MemoryStore() # In-memory, not durable
Output:
Charging {"amount":1000,"currency":"usd"}...
First call: {'status': 'charged', 'id': 'ch_123'}
Second call: {'status': 'charged', 'id': 'ch_123'}
Same inputs → same hash → same result. The handler only runs once.
The "Pause Button" for Your Agent (Human-in-the-loop)
High-stakes operations can require human approval before execution. Use the requires_approval policy hook to decide dynamically:
from agent_ledger import LedgerHooks, ToolCall # Define a policy: which tool calls need approval? def approval_policy(call: ToolCall) -> bool: # Large payments need approval if call.tool == "stripe.charge" and call.args.get("amount", 0) > 1000: return True # Production deploys need approval if call.tool == "k8s.deploy" and call.args.get("env") == "production": return True return False # Notification hook: fires when approval is required async def notify_slack(effect): await slack.post_message( channel="#approvals", text=f"Approval needed: {effect.tool}", blocks=[...], # Include approve/deny buttons with effect.idem_key ) # Combine policy + notification in hooks hooks = LedgerHooks( requires_approval=approval_policy, # Policy: decides IF approval needed on_approval_required=notify_slack, # Notification: fires WHEN approval needed ) result = await ledger.run( ToolCall( workflow_id="deploy-prod", tool="k8s.deploy", args={"image": "app:v2", "env": "production"}, ), handler=deploy_to_k8s, hooks=hooks, ) # Slack bot side: handle button click @slack_app.action("approve") async def handle_approve(ack, body): await ack() idem_key = body["actions"][0]["value"] # From button payload await ledger.approve(idem_key) await slack.post_message(channel="#deployments", text=f"✅ Approved: {idem_key}") @slack_app.action("deny") async def handle_deny(ack, body): await ack() idem_key = body["actions"][0]["value"] await ledger.deny(idem_key, reason="Denied by operator") await slack.post_message(channel="#deployments", text=f"❌ Denied: {idem_key}")
The agent waits. The human decides. The ledger records everything.
Hook types:
requires_approval: Policy hook that returnsbool. Called for fresh effects only (not replays).on_approval_required: Notification hook that fires once when approval is needed. Errors are logged but don't abort the run.
run() polls until the effect is approved, with exponential backoff. By default, approval waits indefinitely (approval_timeout_s=None). Configurable via RunOptions.concurrency (see Configuration). After approval, the handler executes and the result is returned.
Key flow: The notification hook receives effect.idem_key—this is the approval handle. External systems (Slack, admin panels, CLIs) store this key in button payloads/URLs and pass it to approve(idem_key) or deny(idem_key, reason).
Intent-bound approval: The approval is tied to the exact payload hash. If the agent retries with different arguments, that's a new approval request—not a bypass of the previous one.
For static "always require approval", use: hooks=LedgerHooks(requires_approval=lambda _: True)
With LangChain
from agent_ledger import EffectLedger, EffectLedgerOptions, MemoryStore, ToolCall ledger = EffectLedger(EffectLedgerOptions(store=MemoryStore())) async def send_email(to: str, subject: str, body: str) -> str: # Your actual email-sending logic return f"Email sent to {to}" async def execute_tool_safely(tool_name: str, args: dict, workflow_id: str): """Wrap any async function with idempotency.""" async def handler(_): return await send_email(**args) return await ledger.run( ToolCall(workflow_id=workflow_id, tool=tool_name, args=args), handler=handler, ) # In your agent's tool execution loop: result = await execute_tool_safely( tool_name="send_email", args={"to": "customer@example.com", "subject": "Order confirmed", "body": "..."}, workflow_id="order-456", ) # Retries won't send duplicate emails
This pattern works anywhere you control the tool call boundary (LangChain/LangGraph/CrewAI/your own loop).
See examples/ for framework-specific integrations.
How It Works
Every tool call becomes a transaction in the ledger:
ToolCall(workflow_id, tool, args)
│
▼
SHA256(workflow_id | tool | args) → idem_key
│
▼
┌─────────┐
│ LEDGER │
└────┬────┘
│
┌──────────┼──────────┐
│ │ │
fresh in-flight terminal
│ │ │
▼ ▼ ▼
execute wait replay
handler for result recorded
Effect lifecycle:
processing → succeeded
→ failed
→ requires_approval → ready → succeeded/failed
→ denied
→ canceled
Design Constraints
- At-most-once commit per idem_key: Each unique
(workflow_id, tool, args)tuple is recorded at most once, enforced by the store's unique constraint onidem_keyand atomic upsert semantics - Exactly-once execution: Depends on your handler being idempotent or the downstream API supporting idempotency keys. If it does, passing down effect.idem_key would ensure exactly-once execution.
- Deterministic canonicalization: Args are JSON-serialized with sorted keys; non-deterministic values (timestamps, UUIDs) in args will create new records
- Multi-tenant isolation: Include tenant/user/principal in
workflow_id(or inargs) to prevent cross-actor deduplication. The library does not enforce tenant boundaries—your application must scopeworkflow_idappropriately
More Examples
Custom Idempotency Keys
Use only specific fields to produce the effect hash—ignore the rest:
await ledger.run( ToolCall( workflow_id="ticket-456", tool="github.create_issue", args={"owner": "acme", "repo": "app", "title": "Bug", "body": "Details..."}, idempotency_keys=["owner", "repo", "title"], # body changes won't re-execute; would compute to the same hash. ), handler=create_issue, )
PostgreSQL for Production
Your agent's state belongs in Postgres, not ephemeral memory:
from psycopg_pool import AsyncConnectionPool from agent_ledger.stores.postgres import PostgresStore, SCHEMA_SQL pool = AsyncConnectionPool(conninfo="postgresql://localhost/mydb") async with pool.connection() as conn: await conn.execute(SCHEMA_SQL) store = PostgresStore(pool=pool) ledger = EffectLedger(EffectLedgerOptions(store=store)) # Query your audit trail with SQL
Fine-Grained Control
For custom execution logic:
from agent_ledger import CommitSucceeded, CommitFailed, EffectError begin_result = await ledger.begin(call) if begin_result.cached: return begin_result.cached_result try: result = await execute_tool(call.args) await ledger.commit(begin_result.effect.id, CommitSucceeded(result=result)) return result except Exception as e: await ledger.commit( begin_result.effect.id, CommitFailed(error=EffectError(message=str(e))), ) raise
When to Use This
Good fit:
- Agents calling payment APIs, sending emails, creating tickets
- Workflows requiring human-in-the-loop oversight
- Workflows that retry on failure or resume after crashes
- Operations requiring human sign-off before execution
- Systems needing audit trails of what the agent did
Probably not needed:
- Read-only agents (RAG, summarization, search)
- One-off scripts without retry logic
- Prototypes where duplicates are acceptable
Why Not...?
| Alternative | What's missing |
|---|---|
| Retry libraries (Tenacity, Stamina) | Retry the call, but don't deduplicate across processes or restarts |
| In-memory cache | Lost on restart, can't coordinate multiple workers |
| DB unique constraints | Good start, but no lifecycle states, result caching, or approval flows |
| Workflow engines (Temporal, Celery) | Full orchestration systems; agent-ledger is a lightweight layer you can use inside them |
Library-only: no sidecar, no agent runtime, no SaaS. Bring your own store (Memory/Postgres). pip install and go.
FAQ
Does this replace Temporal?
No. Temporal is a full workflow orchestration engine. agent-ledger is a lightweight idempotency layer you can use inside Temporal activities, or standalone.
Can it prevent double Stripe charges/email sends/any other agent action?
It is designed to prevent duplicates for identical calls by replaying the recorded result instead of re-executing the handler. For critical side effects, also pass the downstream provider's idempotency key (for example, Stripe's idempotency_key).
What is workflow_id?
A scope boundary for idempotency. Same (workflow_id, tool, args) = same effect. Different workflow_id = independent effects, even with identical tool+args. You can use this, for example, if an agent is invoked via a webhook to deduplicate all side effects across multiple retries by passing the webhook's id as the workflow_id.
Important: In multi-tenant systems, include the tenant/user identifier in workflow_id (e.g., "tenant-123:order-456") to prevent unintended deduplication across different actors or security boundaries.
Core API
| Method | Purpose |
|---|---|
run(call, handler) |
Execute with idempotency—the main entry point |
begin(call) / commit(id, outcome) |
Manual transaction control |
approve(key) / deny(key) |
Human-in-the-loop approval |
get_effect(id) / find_by_idem_key(key) |
Query ledger state |
See ledger.py for full API with type signatures.
Configuration
Control polling, timeouts, and stale effect handling:
from agent_ledger import RunOptions, ConcurrencyOptions, StaleOptions # Per-call configuration await ledger.run( call, handler=my_handler, run_options=RunOptions( concurrency=ConcurrencyOptions( effect_timeout_s=30.0, # How long to wait for another worker to finish (default: 30s) approval_timeout_s=None, # How long to wait for human approval (default: None = indefinite) initial_interval_s=0.05, # First poll interval (default: 50ms) max_interval_s=1.0, # Max poll interval after backoff (default: 1s) backoff_multiplier=1.5, # Poll interval multiplier each retry (default: 1.5x) jitter_factor=0.3, # Random jitter to prevent thundering herd (default: 0.3) ), stale=StaleOptions( after_ms=60_000, # Take over PROCESSING effects older than this (default: 0 = disabled) ), ), ) # Global defaults (applied to all run() calls unless overridden) from agent_ledger import EffectLedger, EffectLedgerOptions, LedgerDefaults ledger = EffectLedger( EffectLedgerOptions( store=store, defaults=LedgerDefaults( run=RunOptions( concurrency=ConcurrencyOptions(effect_timeout_s=60.0), ), ), ), )
Polling behavior: run() polls with exponential backoff (50ms initial, 1.5x growth, 1s max, 30% jitter).
- Concurrent workers: If another worker is already processing the same effect, wait up to
effect_timeout_s(default: 30s) for them to finish - Human approval: Wait up to
approval_timeout_sfor approval (default:None= wait indefinitely)
License
Apache-2.0
Disclaimer: This library is designed to help reduce duplicate executions through idempotency patterns. It does not guarantee exactly-once semantics in all failure scenarios—correct behavior depends on proper integration, idempotent handlers, and appropriate storage configuration. The authors are not liable for any damages arising from the use of this software. Always test thoroughly before deploying to production. See LICENSE for full terms.
