API Reference
PQ
Postgres-backed task queue client.
__init__(database_url)
Initialize PQ with database connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
database_url
|
str
|
PostgreSQL connection string. |
required |
run_db_migrations()
Run database migrations to latest version.
Call this once at application startup before using the queue. Uses Alembic to apply any pending migrations. Safe to call multiple times - only pending migrations are applied.
Example
pq = PQ("postgresql://localhost/mydb") pq.run_db_migrations()
enqueue(task, *args, run_at=None, priority=Priority.NORMAL, client_id=None, **kwargs)
Enqueue a one-off task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Callable[..., Any]
|
Callable function to execute. |
required |
*args
|
Any
|
Positional arguments to pass to the handler. |
()
|
run_at
|
datetime | None
|
When to run the task. Defaults to now. |
None
|
priority
|
Priority
|
Task priority. Higher = higher priority. Defaults to NORMAL. |
NORMAL
|
client_id
|
str | None
|
Optional client-provided identifier. Must be unique if provided. |
None
|
**kwargs
|
Any
|
Keyword arguments to pass to the handler. |
{}
|
Returns:
| Type | Description |
|---|---|
int
|
Task ID. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If task is a lambda, closure, or cannot be imported. |
IntegrityError
|
If client_id already exists. |
upsert(task, *args, run_at=None, priority=Priority.NORMAL, client_id, **kwargs)
Enqueue a task, updating if client_id already exists.
Behaves like enqueue(), but on conflict for client_id, updates all fields. Status resets to PENDING, attempts to 0, and timestamps are cleared.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Callable[..., Any]
|
Callable function to execute. |
required |
*args
|
Any
|
Positional arguments to pass to the handler. |
()
|
run_at
|
datetime | None
|
When to run the task. Defaults to now. |
None
|
priority
|
Priority
|
Task priority. Higher = higher priority. Defaults to NORMAL. |
NORMAL
|
client_id
|
str
|
Client-provided identifier. Required for conflict resolution. |
required |
**kwargs
|
Any
|
Keyword arguments to pass to the handler. |
{}
|
Returns:
| Type | Description |
|---|---|
int
|
Task ID. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If task is a lambda, closure, or cannot be imported. |
schedule(task, *args, run_every=None, cron=None, priority=Priority.NORMAL, client_id=None, max_concurrent=1, key='', active=True, **kwargs)
Schedule a periodic task.
If a periodic task with this function already exists, it will be updated. Either run_every or cron must be provided, but not both.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Callable[..., Any]
|
Callable function to execute. |
required |
*args
|
Any
|
Positional arguments to pass to the handler. |
()
|
run_every
|
timedelta | None
|
Interval between executions (e.g., timedelta(hours=1)). |
None
|
cron
|
str | croniter | None
|
Cron expression string (e.g., "0 9 * * 1") or croniter object. |
None
|
priority
|
Priority
|
Task priority. Higher = higher priority. Defaults to NORMAL. |
NORMAL
|
client_id
|
str | None
|
Optional client-provided identifier. Must be unique if provided. |
None
|
max_concurrent
|
int | None
|
Maximum concurrent executions. Default 1 (no overlap). Set to None for unlimited concurrency. Values > 1 are reserved for future use and raise ValueError. |
1
|
key
|
str
|
Discriminator for multiple schedules of the same function. Defaults to "" (empty string). |
''
|
active
|
bool
|
Whether the task is active. Inactive tasks are not executed. Defaults to True. |
True
|
**kwargs
|
Any
|
Keyword arguments to pass to the handler. |
{}
|
Returns:
| Type | Description |
|---|---|
int
|
Periodic task ID. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither run_every nor cron is provided, or if both are. |
ValueError
|
If cron expression is invalid. |
ValueError
|
If max_concurrent is greater than 1. |
ValueError
|
If task is a lambda, closure, or cannot be imported. |
IntegrityError
|
If client_id already exists. |
unschedule(task, *, key='')
Remove a periodic task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Callable[..., Any]
|
The scheduled function to remove. |
required |
key
|
str
|
Discriminator key. Defaults to "" (the default schedule). |
''
|
Returns:
| Type | Description |
|---|---|
bool
|
True if task was found and deleted, False otherwise. |
cancel(task_id)
Cancel a one-off task by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
int
|
Task ID. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if task was found and deleted, False otherwise. |
run_worker(*, poll_interval=1.0, max_runtime=30 * 60, priorities=None)
Run the worker loop (blocking).
Each task executes in a forked child process for memory isolation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
poll_interval
|
float
|
Seconds to sleep between polls when idle. |
1.0
|
max_runtime
|
float
|
Maximum execution time per task in seconds. Default: 30 min. |
30 * 60
|
priorities
|
Set[Priority] | None
|
If set, only process tasks with these priority levels. Use this to dedicate workers to specific priority tiers. |
None
|
run_worker_once(*, max_runtime=30 * 60, priorities=None)
Process a single task if available.
Each task executes in a forked child process for memory isolation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_runtime
|
float
|
Maximum execution time per task in seconds. Default: 30 min. |
30 * 60
|
priorities
|
Set[Priority] | None
|
If set, only process tasks with these priority levels. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if a task was processed, False if queue was empty. |
pending_count()
Count pending one-off tasks.
periodic_count()
Count periodic task schedules.
get_task(task_id)
Get a task by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
int
|
Task ID. |
required |
Returns:
| Type | Description |
|---|---|
Task | None
|
Task object or None if not found. |
get_task_by_client_id(client_id)
Get a task by client_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_id
|
str
|
Client-provided identifier. |
required |
Returns:
| Type | Description |
|---|---|
Task | None
|
Task object or None if not found. |
get_periodic_by_client_id(client_id)
Get a periodic task by client_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_id
|
str
|
Client-provided identifier. |
required |
Returns:
| Type | Description |
|---|---|
Periodic | None
|
Periodic object or None if not found. |
list_failed(limit=100)
List failed tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of tasks to return. |
100
|
Returns:
| Type | Description |
|---|---|
list[Task]
|
List of failed tasks, most recent first. |
list_completed(limit=100)
List completed tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of tasks to return. |
100
|
Returns:
| Type | Description |
|---|---|
list[Task]
|
List of completed tasks, most recent first. |
clear_failed(before=None)
Clear failed tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
before
|
datetime | None
|
Only clear tasks failed before this time. If None, clears all. |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of tasks deleted. |
clear_completed(before=None)
Clear completed tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
before
|
datetime | None
|
Only clear tasks completed before this time. If None, clears all. |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of tasks deleted. |
clear_all()
Clear all tasks and periodic schedules.
Priority
TaskStatus
Models
Worker Hooks
Bases: Protocol
Protocol for pre-execution hooks called before task handler runs.
Called in the forked child process before task execution. Use for initializing fork-unsafe resources (OTel, DB connections).
Bases: Protocol
Protocol for post-execution hooks called after task handler completes.
Called in the forked child process after task execution (success or failure). Use for cleanup/flushing (OTel traces, etc.).