GitHub - neul-labs/fastworker: Add background workers to your FastAPI and Python applications in seconds, without the complexity.

5 min read Original article ↗

FastWorker

A brokerless task queue for Python applications with automatic worker discovery, priority handling, and built-in management GUI.

No Redis. No RabbitMQ. Just Python.

PyPI version License: MIT

Why FastWorker?

Traditional task queues (Celery + Redis) require deploying and managing 4-6+ separate services:

  • Your application
  • Redis broker
  • Celery workers
  • Redis result backend
  • Optional: Flower monitoring
  • Optional: Redis Sentinel for HA

FastWorker requires just 2-3 Python processes:

  • Your application
  • FastWorker control plane (with built-in web UI)
  • FastWorker workers (optional, for scaling)

That's it. No external dependencies. No Redis to configure, monitor, backup, or secure. Just Python.

Features

  • Brokerless Architecture - No Redis, RabbitMQ, or other message brokers required
  • Control Plane Architecture - Centralized coordination with distributed subworkers
  • Built-in Management GUI - Real-time web dashboard for monitoring workers, queues, and tasks
  • Automatic Worker Discovery - Workers find each other automatically on the network
  • Priority Queues - Support for critical, high, normal, and low priority tasks
  • Result Caching - Task results cached with expiration and memory limits
  • Task Completion Callbacks - Receive real-time notifications when tasks complete
  • Built-in Reliability - Automatic retries and error handling
  • FastAPI Integration - Seamless integration with web applications
  • OpenTelemetry Support - Optional distributed tracing and metrics for observability
  • Zero Configuration - Works out of the box with sensible defaults

Note: FastWorker is designed for moderate-scale Python applications (1K-10K tasks/min). For extreme scale, multi-language support, or complex workflows, see Limitations & Scope.

Installation

Quick Start

1. Define Tasks

# mytasks.py
from fastworker import task

@task
def add(x: int, y: int) -> int:
    """Add two numbers."""
    return x + y

@task
def multiply(x: int, y: int) -> int:
    """Multiply two numbers."""
    return x * y

2. Start Control Plane

# Terminal 1 - Start the control plane (coordinates and also processes tasks)
fastworker control-plane --task-modules mytasks

The control plane starts with a built-in management GUI at http://127.0.0.1:8080

3. Start Subworkers (Optional - for scaling)

# Terminal 2 - Start subworker 1
fastworker subworker --worker-id subworker1 --control-plane-address tcp://127.0.0.1:5555 --base-address tcp://127.0.0.1:5561 --task-modules mytasks

# Terminal 3 - Start subworker 2 (optional)
fastworker subworker --worker-id subworker2 --control-plane-address tcp://127.0.0.1:5555 --base-address tcp://127.0.0.1:5565 --task-modules mytasks

4. Submit Tasks

Blocking mode (wait for result):

fastworker submit --task-name add --args 5 3

Non-blocking mode (get task ID immediately):

fastworker submit --task-name add --args 5 3 --non-blocking
# Returns: Task ID: <uuid>

Check task status:

fastworker status --task-id <uuid>

5. Using Python Client

Non-blocking (recommended):

from fastworker import Client
import asyncio

async def main():
    client = Client()
    await client.start()

    # Non-blocking: Returns immediately with task ID
    task_id = await client.delay("add", 5, 3)
    print(f"Task submitted: {task_id}")

    # Check result later
    result = await client.get_task_result(task_id)
    if result:
        print(f"Result: {result.result}")

    client.stop()

asyncio.run(main())

Blocking (when you need the result immediately):

# Blocking: Waits for result
result = await client.submit_task("add", args=(5, 3))
print(f"Result: {result.result}")

Architecture

FastWorker uses a Control Plane Architecture:

  • Control Plane Worker: Central coordinator that manages subworkers and also processes tasks
  • Subworkers: Additional workers that register with the control plane for load distribution
  • Clients: Connect only to the control plane for task submission

Benefits

  • Centralized Management: Control plane coordinates all task distribution
  • Load Balancing: Tasks automatically distributed to least-loaded subworkers
  • High Availability: Control plane processes tasks if no subworkers available
  • Result Persistence: Results cached in control plane with expiration
  • Scalability: Add subworkers dynamically without reconfiguration

CLI Usage

# Start control plane
fastworker control-plane --worker-id control-plane --task-modules mytasks

# Start subworker
fastworker subworker --worker-id subworker1 --control-plane-address tcp://127.0.0.1:5555 --task-modules mytasks

# Submit task (blocking)
fastworker submit --task-name add --args 5 3

# Submit task (non-blocking)
fastworker submit --task-name add --args 5 3 --non-blocking

# Check task status
fastworker status --task-id <uuid>

# List available tasks
fastworker list --task-modules mytasks

Management GUI

FastWorker includes a built-in web dashboard that starts automatically with the control plane. No additional setup required.

Access the GUI at: http://127.0.0.1:8080 (default)

Features

  • Real-time Monitoring - Auto-refreshes every 5 seconds
  • Worker Status - View active/inactive subworkers with load metrics
  • Queue Visualization - See task counts by priority (critical, high, normal, low)
  • Task History - Browse cached task results with status and timing
  • Cache Statistics - Monitor result cache utilization and TTL

Configuration

# Default: GUI enabled on 127.0.0.1:8080
fastworker control-plane --task-modules mytasks

# Custom host/port (e.g., for remote access)
fastworker control-plane --gui-host 0.0.0.0 --gui-port 9000 --task-modules mytasks

# Disable GUI entirely
fastworker control-plane --no-gui --task-modules mytasks

Environment Variables

  • FASTWORKER_GUI_ENABLED - Enable/disable GUI (default: true)
  • FASTWORKER_GUI_HOST - GUI server host (default: 127.0.0.1)
  • FASTWORKER_GUI_PORT - GUI server port (default: 8080)

Priority Handling

from fastworker.tasks.models import TaskPriority

# Submit with priority
await client.delay("critical_task", priority=TaskPriority.CRITICAL)
await client.delay("normal_task", priority=TaskPriority.NORMAL)

Result Caching

The control plane maintains a result cache with:

  • Configurable Size: Default 10,000 results (configurable via --result-cache-size)
  • TTL: Default 1 hour (configurable via --result-cache-ttl)
  • LRU Eviction: Least recently accessed results evicted when cache is full
  • Automatic Cleanup: Expired results cleaned up every minute

Configuration

Control Plane

fastworker control-plane \
  --worker-id control-plane \
  --base-address tcp://127.0.0.1:5555 \
  --discovery-address tcp://127.0.0.1:5550 \
  --result-cache-size 10000 \
  --result-cache-ttl 3600 \
  --gui-host 127.0.0.1 \
  --gui-port 8080 \
  --task-modules mytasks

Subworker

fastworker subworker \
  --worker-id subworker1 \
  --control-plane-address tcp://127.0.0.1:5555 \
  --base-address tcp://127.0.0.1:5561 \
  --task-modules mytasks

Client

client = Client(
    discovery_address="tcp://127.0.0.1:5550",
    timeout=60,
    retries=5
)

Development

# Clone repository
git clone https://github.com/dipankar/fastworker.git
cd fastworker

# Install dependencies
uv sync

# Run tests
uv run pytest

# Format code
uv run black .

Requirements

  • Python 3.12+
  • pynng
  • pydantic

License

MIT License - see LICENSE file for details.

Documentation

For detailed documentation, see:

Contributing

Contributions welcome! Please read CONTRIBUTING.md for guidelines.