Architecture
Overview
flowchart TB
subgraph app["Your Application"]
enqueue["pq.enqueue(task, ...)"]
schedule["pq.schedule(task, ...)"]
end
subgraph pg["PostgreSQL"]
tasks[("pq_tasks<br/>(one-off)")]
periodic[("pq_periodic<br/>(recurring)")]
end
subgraph worker["Worker Process"]
parent["Parent"]
child["Child Process<br/>(isolated memory)"]
parent -->|"fork()"| child
child -->|"execute task"| child
child -->|"exit"| parent
parent -->|"wait4()"| parent
end
enqueue --> tasks
schedule --> periodic
tasks -->|"FOR UPDATE<br/>SKIP LOCKED"| parent
periodic -->|"FOR UPDATE<br/>SKIP LOCKED"| parent
Components
PQ Client
The main interface for your application:
from pq import PQ
pq = PQ("postgresql://localhost/mydb")
# Enqueue one-off tasks
pq.enqueue(send_email, to="user@example.com")
# Schedule recurring tasks
pq.schedule(cleanup, run_every=timedelta(hours=1))
PostgreSQL Tables
pq_tasks - One-off tasks:
| Column | Purpose |
|---|---|
name |
Function path (e.g., myapp.tasks:send_email) |
payload |
JSONB with serialized args/kwargs |
priority |
0-100, higher runs first |
status |
pending → running → completed/failed |
run_at |
Scheduled execution time |
error |
Error message if failed |
pq_periodic - Recurring schedules:
| Column | Purpose |
|---|---|
name |
Function path (unique with key) |
key |
Discriminator for multiple schedules of the same function (default "") |
payload |
JSONB with serialized args/kwargs |
priority |
0-100, higher runs first |
run_every |
Interval (e.g., 1 hour) |
cron |
Cron expression (e.g., 0 9 * * 1) |
active |
Whether the task is executed (default true) |
next_run |
Next scheduled execution |
Fork Isolation
Every task runs in an isolated child process:
flowchart LR
subgraph parent["Worker (Parent)"]
poll["Poll for task"]
fork["fork()"]
wait["wait4()"]
update["Update status"]
end
subgraph child["Child Process"]
setup["Set timeout"]
pre["pre_execute()"]
exec["Execute handler"]
post["post_execute()"]
exit["exit()"]
end
poll --> fork
fork --> setup
setup --> pre
pre --> exec
exec --> post
post --> exit
exit -.->|"exit code"| wait
wait --> update
update --> poll
The parent monitors the child via wait4() and detects:
| Exit Condition | Detection | Result |
|---|---|---|
| Success | Exit code 0 | COMPLETED |
| Exception | Exit code 1 + pipe message | FAILED with error |
| Timeout | Exit code 124 or SIGALRM | FAILED with timeout |
| OOM | SIGKILL (signal 9) | FAILED with OOM |
| Crash | Other signals | FAILED with signal |
Benefits:
- Memory isolation - Child OOM doesn't crash worker
- Crash isolation - Segfaults only kill the child
- Clean state - Each task starts fresh
- Hook safety - Lifecycle hooks run in the child, safe for fork-unsafe resources
Lifecycle Hooks
Optional pre_execute and post_execute hooks run in the forked child:
from pq import Task, Periodic
def pre_execute(task: Task | Periodic) -> None:
# Initialize fork-unsafe resources (OTel, DB connections)
pass
def post_execute(task: Task | Periodic, error: Exception | None) -> None:
# Cleanup/flush (OTel traces, etc.)
pass
pq.run_worker(pre_execute=pre_execute, post_execute=post_execute)
This is essential for OpenTelemetry and similar libraries that don't survive fork().
Concurrency
Multiple workers safely process tasks in parallel using PostgreSQL's FOR UPDATE SKIP LOCKED:
flowchart LR
subgraph workers["Workers"]
w1["Worker 1"]
w2["Worker 2"]
w3["Worker 3"]
end
subgraph db["pq_tasks"]
t1["Task A 🔒 W1"]
t2["Task B 🔒 W2"]
t3["Task C 🔒 W3"]
t4["Task D ⏳"]
t5["Task E ⏳"]
end
w1 -.-> t1
w2 -.-> t2
w3 -.-> t3
SELECT * FROM pq_tasks
WHERE status = 'pending' AND run_at <= now()
ORDER BY priority DESC, run_at
FOR UPDATE SKIP LOCKED
LIMIT 1;
FOR UPDATElocks the selected rowSKIP LOCKEDskips rows locked by other workers- Each task runs exactly once
Priority System
Tasks are processed in priority order (higher first):
| Priority | Value | Use Case |
|---|---|---|
CRITICAL |
100 | User-facing, time-sensitive |
HIGH |
75 | Important but less urgent |
NORMAL |
50 | Default |
LOW |
25 | Background work |
BATCH |
0 | Bulk operations |
Dedicated workers can handle specific priorities:
# High-priority worker
pq.run_worker(priorities={Priority.CRITICAL, Priority.HIGH})
# Batch worker
pq.run_worker(priorities={Priority.BATCH})
Serialization
flowchart TD
input["args, kwargs"]
json{"JSON-safe?"}
pydantic{"Pydantic?"}
dill["dill encode"]
payload["JSONB payload"]
input --> json
json -->|"Yes"| payload
json -->|"No"| pydantic
pydantic -->|"model_dump()"| payload
pydantic -->|"No"| dill
dill --> payload
| Type | Method |
|---|---|
| str, int, list, dict, None | JSON |
| Pydantic models | model_dump() → JSON |
| Lambdas, closures, custom objects | dill (pickle-like) |
Task Lifecycle
stateDiagram-v2
[*] --> PENDING: enqueue()
PENDING --> RUNNING: worker claims
RUNNING --> COMPLETED: success
RUNNING --> FAILED: error/timeout/OOM
File Structure
src/pq/
├── __init__.py # Public API exports
├── client.py # PQ class - main interface
├── worker.py # Worker loop and fork execution
├── models.py # SQLAlchemy models (Task, Periodic)
├── priority.py # Priority enum
├── serialization.py # JSON/Pydantic/dill serialization
├── registry.py # Function path resolution
└── logging.py # Loguru configuration