Skip to content

Examples

Runnable examples in the examples/ directory.

Basic Usage

examples/basic.py - Enqueue and process a task.

from pq import PQ

pq = PQ("postgresql://localhost/mydb")
pq.create_tables()

def send_email(to: str, subject: str) -> None:
    print(f"Sending email to {to}: {subject}")

pq.enqueue(send_email, to="user@example.com", subject="Hello!")
pq.run_worker_once()

Periodic Tasks

examples/periodic.py - Scheduled and cron-based tasks.

from datetime import timedelta
from pq import PQ

pq = PQ("postgresql://localhost/mydb")

def heartbeat() -> None:
    print("alive")

# Run every 5 minutes
pq.schedule(heartbeat, run_every=timedelta(minutes=5))

# Run at 9am on Mondays
pq.schedule(heartbeat, cron="0 9 * * 1")

# Multiple schedules for the same function using key
def sync_data(region: str) -> None:
    print(f"Syncing {region}")

pq.schedule(sync_data, run_every=timedelta(hours=1), key="us", region="us")
pq.schedule(sync_data, run_every=timedelta(hours=2), key="eu", region="eu")

# Remove only one schedule
pq.unschedule(sync_data, key="us")

# Remove default (no key) schedule
pq.unschedule(heartbeat)

Pausing & Resuming Periodic Tasks

Disable a periodic task without removing it, then re-enable it later:

from datetime import timedelta
from pq import PQ

pq = PQ("postgresql://localhost/mydb")

def sync_inventory() -> None:
    print("syncing...")

# Schedule as usual (active by default)
pq.schedule(sync_inventory, run_every=timedelta(minutes=5))

# Pause - keeps the schedule but the worker won't run it
pq.schedule(sync_inventory, run_every=timedelta(minutes=5), active=False)

# Resume
pq.schedule(sync_inventory, run_every=timedelta(minutes=5), active=True)

Priority Queues

examples/priority.py - Task prioritization.

from pq import PQ, Priority

pq = PQ("postgresql://localhost/mydb")

def process(name: str) -> None:
    print(f"Processing: {name}")

# Higher priority tasks run first
pq.enqueue(process, "batch", priority=Priority.BATCH)      # 0
pq.enqueue(process, "normal", priority=Priority.NORMAL)    # 50
pq.enqueue(process, "urgent", priority=Priority.CRITICAL)  # 100

Async Tasks

examples/async_tasks.py - Async handler support.

import asyncio
from pq import PQ

pq = PQ("postgresql://localhost/mydb")

async def fetch_data(url: str) -> None:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)
    print("Done")

pq.enqueue(fetch_data, url="https://api.example.com")
pq.run_worker_once()

Client IDs

Use client_id for custom identifiers to link tasks to your domain objects.

from pq import PQ

pq = PQ("postgresql://localhost/mydb")

def process_order(order_id: int) -> None:
    print(f"Processing order {order_id}")

# Enqueue with a client-provided ID
pq.enqueue(process_order, order_id=123, client_id="order-123")

# Look up by client_id
task = pq.get_task_by_client_id("order-123")
print(f"Task status: {task.status}")

# Cancel by client_id
if task:
    pq.cancel(task.id)

# Duplicate client_id raises IntegrityError
# pq.enqueue(process_order, order_id=456, client_id="order-123")  # Error!

Upsert

Use upsert() to insert or update a task by client_id. Unlike enqueue(), it won't raise on duplicates - it updates the existing task instead.

from pq import PQ

pq = PQ("postgresql://localhost/mydb")

def send_notification(user_id: int, message: str) -> None:
    print(f"Notifying user {user_id}: {message}")

# First call creates the task
pq.upsert(send_notification, user_id=42, message="Hello", client_id="notify-42")

# Second call updates the existing task (resets status to PENDING)
pq.upsert(send_notification, user_id=42, message="Updated!", client_id="notify-42")

# Useful for debouncing - only the latest version runs
for i in range(100):
    pq.upsert(send_notification, user_id=42, message=f"Message {i}", client_id="notify-42")
# Only the last message will be processed

Worker Lifecycle Hooks

Use pre_execute and post_execute hooks to run code before/after task execution. Hooks run in the forked child process, making them ideal for fork-unsafe resources like OpenTelemetry.

from pq import PQ, Task, Periodic

pq = PQ("postgresql://localhost/mydb")

def setup_otel(task: Task | Periodic) -> None:
    """Called before task execution in forked child."""
    # Initialize tracer, create span, etc.
    print(f"[OTel] Starting span for {task.name}")

def flush_otel(task: Task | Periodic, error: Exception | None) -> None:
    """Called after task execution (success or failure)."""
    if error:
        print(f"[OTel] Recording error: {error}")
    print(f"[OTel] Flushing traces for {task.name}")

# Run worker with hooks
pq.run_worker(pre_execute=setup_otel, post_execute=flush_otel)

Hooks receive the full task object with metadata:

def pre_hook(task: Task | Periodic) -> None:
    print(f"Task ID: {task.id}")
    print(f"Task name: {task.name}")
    print(f"Client ID: {task.client_id}")
    print(f"Priority: {task.priority}")

Error Handling

examples/error_handling.py - Failures and cleanup.

from datetime import UTC, datetime, timedelta
from pq import PQ

pq = PQ("postgresql://localhost/mydb")

# Check failed tasks
for task in pq.list_failed():
    print(f"{task.name}: {task.error}")

# Cleanup old tasks
week_ago = datetime.now(UTC) - timedelta(days=7)
pq.clear_failed(before=week_ago)
pq.clear_completed(before=week_ago)

Running Examples

# Start Postgres
make dev

# Run any example
uv run examples/basic.py
uv run examples/periodic.py
uv run examples/priority.py
uv run examples/async_tasks.py
uv run examples/error_handling.py