FastScheduler
Simple, lightweight task scheduler for Python with async support, timezone handling, cron expressions, and a beautiful real-time dashboard.
If this saves you time, ⭐️ the repo and open an issue for ideas — I'm actively improving it.
Features
- 🎯 Simple decorator-based API - Schedule tasks in one line
- ⚡ Async/await support - Native async function support
- 🕐 Timezone support - Schedule jobs in any timezone
- 📅 Cron expressions - Complex schedules with cron syntax
- 💾 Persistent state - Survives restarts, handles missed jobs
- 🎨 FastAPI dashboard - Beautiful real-time monitoring UI
- 🔄 Automatic retries - Configurable retry with exponential backoff
- ⏱️ Job timeouts - Kill long-running jobs automatically
- ⏸️ Pause/Resume - Control jobs without removing them
- 📋 Dead Letter Queue - Track and debug failed jobs
Installation
# Basic installation pip install fastscheduler # With FastAPI dashboard pip install fastscheduler[fastapi] # With cron expression support pip install fastscheduler[cron] # All features pip install fastscheduler[all]
Quick Start
from fastscheduler import FastScheduler scheduler = FastScheduler(quiet=True) @scheduler.every(10).seconds def task(): print("Task executed") @scheduler.daily.at("14:30") async def daily_task(): print("Daily task at 2:30 PM") scheduler.start()
Scheduling Options
Interval-based
@scheduler.every(10).seconds @scheduler.every(5).minutes @scheduler.every(2).hours @scheduler.every(1).days
Time-based
@scheduler.daily.at("09:00") # Daily at 9 AM @scheduler.hourly.at(":30") # Every hour at :30 @scheduler.weekly.monday.at("10:00") # Every Monday at 10 AM @scheduler.weekly.weekdays.at("09:00") # Weekdays at 9 AM @scheduler.weekly.weekends.at("12:00") # Weekends at noon
Cron Expressions
Requires: pip install fastscheduler[cron]
@scheduler.cron("0 9 * * MON-FRI") # 9 AM on weekdays def market_open(): ... @scheduler.cron("*/15 * * * *") # Every 15 minutes def frequent_check(): ... @scheduler.cron("0 0 1 * *") # First day of each month def monthly_report(): ...
One-time Jobs
@scheduler.once(60) # Run once after 60 seconds def delayed_task(): ... @scheduler.at("2024-12-25 00:00:00") # Run at specific datetime def christmas_task(): ...
Timezone Support
Schedule jobs in any timezone:
# Using the tz parameter @scheduler.daily.at("09:00", tz="America/New_York") def nyc_morning(): print("Good morning, New York!") # Using the .tz() method (chainable) @scheduler.weekly.monday.tz("Europe/London").at("09:00") def london_standup(): print("Monday standup") # With cron expressions @scheduler.cron("0 9 * * MON-FRI").tz("Asia/Tokyo") def tokyo_market(): print("Tokyo market open")
Common timezones: UTC, America/New_York, America/Los_Angeles, Europe/London, Europe/Paris, Asia/Tokyo, Asia/Shanghai, Australia/Sydney
Job Control
Timeouts
Kill jobs that run too long:
@scheduler.every(1).minutes.timeout(30) # Kill if runs > 30 seconds def quick_task(): ... @scheduler.daily.at("02:00").timeout(3600) # 1 hour max def nightly_backup(): ...
Retries
Configure automatic retries on failure:
@scheduler.every(5).minutes.retries(5) # Retry up to 5 times def flaky_api_call(): ...
Retries use exponential backoff (2s, 4s, 8s, 16s, ...).
Skip Catch-up
Don't run missed jobs after restart:
@scheduler.every(1).hours.no_catch_up() def hourly_stats(): ...
Pause, Resume, and Cancel
# Pause a job (stays in queue but won't execute) scheduler.pause_job("job_0") # Resume a paused job scheduler.resume_job("job_0") # Cancel and remove a job scheduler.cancel_job("job_0") # Cancel all jobs with a specific function name scheduler.cancel_job_by_name("my_task")
FastAPI Integration
Add a beautiful real-time dashboard to your FastAPI app:
from fastapi import FastAPI from fastscheduler import FastScheduler from fastscheduler.fastapi_integration import create_scheduler_routes app = FastAPI() scheduler = FastScheduler(quiet=True) # Add dashboard at /scheduler/ app.include_router(create_scheduler_routes(scheduler)) @scheduler.every(30).seconds def background_task(): print("Background work") scheduler.start()
Dashboard Features
Access at http://localhost:8000/scheduler/
- Real-time updates via Server-Sent Events (SSE)
- Job table with status indicators, last 5 run results, and countdown timers
- Quick actions - Pause/Resume/Cancel directly from the UI
- Execution history tab with filtering and search
- Dead letter queue tab - view failed jobs with error details
- Statistics - Success rate, uptime, active jobs count
- Toast notifications - Alerts for job completions and failures
API Endpoints
| Endpoint | Method | Description |
|---|---|---|
/scheduler/ |
GET | Dashboard UI |
/scheduler/api/status |
GET | Scheduler status |
/scheduler/api/jobs |
GET | List all jobs |
/scheduler/api/jobs/{job_id} |
GET | Get specific job |
/scheduler/api/jobs/{job_id}/pause |
POST | Pause a job |
/scheduler/api/jobs/{job_id}/resume |
POST | Resume a job |
/scheduler/api/jobs/{job_id}/cancel |
POST | Cancel a job |
/scheduler/api/history |
GET | Execution history |
/scheduler/api/dead-letters |
GET | Dead letter queue (failed jobs) |
/scheduler/api/dead-letters |
DELETE | Clear dead letter queue |
/scheduler/events |
GET | SSE event stream |
Configuration
scheduler = FastScheduler( state_file="scheduler.json", # Persistence file (default: fastscheduler_state.json) quiet=True, # Suppress log messages (default: False) auto_start=False, # Start immediately (default: False) max_history=5000, # Max history entries to keep (default: 10000) max_workers=20, # Concurrent job threads (default: 10) history_retention_days=8, # Delete history older than X days (default: 7) max_dead_letters=500, # Max failed jobs in dead letter queue (default: 500) )
History Retention
History is automatically cleaned up based on two limits (both are enforced):
- Count limit:
max_history- maximum number of entries - Time limit:
history_retention_days- maximum age in days
Set history_retention_days=0 to disable time-based cleanup (only count limit applies).
Dead Letter Queue
Failed job executions are automatically stored in a separate dead letter queue for debugging:
# Get failed jobs dead_letters = scheduler.get_dead_letters(limit=100) # Clear the queue scheduler.clear_dead_letters()
The dead letter queue:
- Stores the last
max_dead_lettersfailed jobs (default: 500) - Persists to a separate JSON file (
*_dead_letters.json) - Includes error messages, timestamps, run counts, and execution times
- Viewable in the dashboard "Failed" tab
Monitoring
Programmatic Access
# Get all jobs jobs = scheduler.get_jobs() # Get specific job job = scheduler.get_job("job_0") # Get execution history history = scheduler.get_history(limit=100) history = scheduler.get_history(func_name="my_task", limit=50) # Get statistics stats = scheduler.get_statistics() # Returns: total_runs, total_failures, uptime, per_job stats # Print simple status to console scheduler.print_status()
Context Manager
with FastScheduler(quiet=True) as scheduler: @scheduler.every(5).seconds def task(): print("Running") # Scheduler starts automatically time.sleep(30) # Scheduler stops automatically on exit
State Persistence
FastScheduler automatically saves state to disk:
- Job definitions and schedules
- Execution history
- Statistics
- Job counter (ensures unique IDs across restarts)
On restart, it:
- Restores all jobs
- Calculates missed executions
- Runs catch-up jobs (unless
no_catch_up()is set)
Examples
Complete Example
import asyncio import time from fastscheduler import FastScheduler scheduler = FastScheduler(quiet=True) # Simple interval job @scheduler.every(10).seconds def heartbeat(): print(f"[{time.strftime('%H:%M:%S')}] ❤️ Heartbeat") # Async job with timezone @scheduler.daily.at("09:00", tz="America/New_York").timeout(60) async def morning_report(): print("Generating report...") await asyncio.sleep(5) print("Report sent!") # Cron job with retries @scheduler.cron("*/5 * * * *").retries(3) def check_api(): print("Checking API health") # Weekly job @scheduler.weekly.monday.at("10:00") def weekly_standup(): print("Time for standup!") # Start scheduler scheduler.start() try: while True: time.sleep(60) scheduler.print_status() except KeyboardInterrupt: scheduler.stop()
FastAPI with Lifespan
from contextlib import asynccontextmanager from fastapi import FastAPI from fastscheduler import FastScheduler from fastscheduler.fastapi_integration import create_scheduler_routes scheduler = FastScheduler(quiet=True) @asynccontextmanager async def lifespan(app: FastAPI): scheduler.start() yield scheduler.stop(wait=True) app = FastAPI(lifespan=lifespan) app.include_router(create_scheduler_routes(scheduler)) @scheduler.every(30).seconds def background_job(): print("Working...")
API Reference
FastScheduler
| Method | Description |
|---|---|
start() |
Start the scheduler |
stop(wait=True, timeout=30) |
Stop gracefully |
get_jobs() |
List all scheduled jobs |
get_job(job_id) |
Get specific job by ID |
get_history(func_name=None, limit=50) |
Get execution history |
get_statistics() |
Get runtime statistics |
get_dead_letters(limit=100) |
Get dead letter queue (failed jobs) |
clear_dead_letters() |
Clear all dead letter entries |
pause_job(job_id) |
Pause a job |
resume_job(job_id) |
Resume a paused job |
cancel_job(job_id) |
Cancel and remove a job |
cancel_job_by_name(func_name) |
Cancel all jobs by function name |
print_status() |
Print status to console |
Scheduler Methods
| Method | Description |
|---|---|
every(n).seconds/minutes/hours/days |
Interval scheduling |
daily.at("HH:MM") |
Daily at specific time |
hourly.at(":MM") |
Hourly at specific minute |
weekly.monday/tuesday/.../sunday.at("HH:MM") |
Weekly scheduling |
weekly.weekdays/weekends.at("HH:MM") |
Weekday/weekend scheduling |
cron("expression") |
Cron expression scheduling |
once(seconds) |
One-time delayed execution |
at("YYYY-MM-DD HH:MM:SS") |
One-time at specific datetime |
Chainable Modifiers
| Modifier | Description |
|---|---|
.timeout(seconds) |
Maximum execution time |
.retries(n) |
Maximum retry attempts |
.no_catch_up() |
Skip missed executions |
.tz("timezone") |
Set timezone for schedule |
License
MIT
Contributing
Contributions welcome! Please open an issue or PR on GitHub.

