Skip to content

API Reference

PQ

Postgres-backed task queue client.

__init__(database_url)

Initialize PQ with database connection.

Parameters:

Name Type Description Default
database_url str

PostgreSQL connection string.

required

run_db_migrations()

Run database migrations to latest version.

Call this once at application startup before using the queue. Uses Alembic to apply any pending migrations. Safe to call multiple times - only pending migrations are applied.

Example

pq = PQ("postgresql://localhost/mydb") pq.run_db_migrations()

enqueue(task, *args, run_at=None, priority=Priority.NORMAL, client_id=None, **kwargs)

Enqueue a one-off task.

Parameters:

Name Type Description Default
task Callable[..., Any]

Callable function to execute.

required
*args Any

Positional arguments to pass to the handler.

()
run_at datetime | None

When to run the task. Defaults to now.

None
priority Priority

Task priority. Higher = higher priority. Defaults to NORMAL.

NORMAL
client_id str | None

Optional client-provided identifier. Must be unique if provided.

None
**kwargs Any

Keyword arguments to pass to the handler.

{}

Returns:

Type Description
int

Task ID.

Raises:

Type Description
ValueError

If task is a lambda, closure, or cannot be imported.

IntegrityError

If client_id already exists.

upsert(task, *args, run_at=None, priority=Priority.NORMAL, client_id, **kwargs)

Enqueue a task, updating if client_id already exists.

Behaves like enqueue(), but on conflict for client_id, updates all fields. Status resets to PENDING, attempts to 0, and timestamps are cleared.

Parameters:

Name Type Description Default
task Callable[..., Any]

Callable function to execute.

required
*args Any

Positional arguments to pass to the handler.

()
run_at datetime | None

When to run the task. Defaults to now.

None
priority Priority

Task priority. Higher = higher priority. Defaults to NORMAL.

NORMAL
client_id str

Client-provided identifier. Required for conflict resolution.

required
**kwargs Any

Keyword arguments to pass to the handler.

{}

Returns:

Type Description
int

Task ID.

Raises:

Type Description
ValueError

If task is a lambda, closure, or cannot be imported.

schedule(task, *args, run_every=None, cron=None, priority=Priority.NORMAL, client_id=None, max_concurrent=1, key='', active=True, **kwargs)

Schedule a periodic task.

If a periodic task with this function already exists, it will be updated. Either run_every or cron must be provided, but not both.

Parameters:

Name Type Description Default
task Callable[..., Any]

Callable function to execute.

required
*args Any

Positional arguments to pass to the handler.

()
run_every timedelta | None

Interval between executions (e.g., timedelta(hours=1)).

None
cron str | croniter | None

Cron expression string (e.g., "0 9 * * 1") or croniter object.

None
priority Priority

Task priority. Higher = higher priority. Defaults to NORMAL.

NORMAL
client_id str | None

Optional client-provided identifier. Must be unique if provided.

None
max_concurrent int | None

Maximum concurrent executions. Default 1 (no overlap). Set to None for unlimited concurrency. Values > 1 are reserved for future use and raise ValueError.

1
key str

Discriminator for multiple schedules of the same function. Defaults to "" (empty string).

''
active bool

Whether the task is active. Inactive tasks are not executed. Defaults to True.

True
**kwargs Any

Keyword arguments to pass to the handler.

{}

Returns:

Type Description
int

Periodic task ID.

Raises:

Type Description
ValueError

If neither run_every nor cron is provided, or if both are.

ValueError

If cron expression is invalid.

ValueError

If max_concurrent is greater than 1.

ValueError

If task is a lambda, closure, or cannot be imported.

IntegrityError

If client_id already exists.

unschedule(task, *, key='')

Remove a periodic task.

Parameters:

Name Type Description Default
task Callable[..., Any]

The scheduled function to remove.

required
key str

Discriminator key. Defaults to "" (the default schedule).

''

Returns:

Type Description
bool

True if task was found and deleted, False otherwise.

cancel(task_id)

Cancel a one-off task by ID.

Parameters:

Name Type Description Default
task_id int

Task ID.

required

Returns:

Type Description
bool

True if task was found and deleted, False otherwise.

run_worker(*, poll_interval=1.0, max_runtime=30 * 60, priorities=None)

Run the worker loop (blocking).

Each task executes in a forked child process for memory isolation.

Parameters:

Name Type Description Default
poll_interval float

Seconds to sleep between polls when idle.

1.0
max_runtime float

Maximum execution time per task in seconds. Default: 30 min.

30 * 60
priorities Set[Priority] | None

If set, only process tasks with these priority levels. Use this to dedicate workers to specific priority tiers.

None

run_worker_once(*, max_runtime=30 * 60, priorities=None)

Process a single task if available.

Each task executes in a forked child process for memory isolation.

Parameters:

Name Type Description Default
max_runtime float

Maximum execution time per task in seconds. Default: 30 min.

30 * 60
priorities Set[Priority] | None

If set, only process tasks with these priority levels.

None

Returns:

Type Description
bool

True if a task was processed, False if queue was empty.

pending_count()

Count pending one-off tasks.

periodic_count()

Count periodic task schedules.

get_task(task_id)

Get a task by ID.

Parameters:

Name Type Description Default
task_id int

Task ID.

required

Returns:

Type Description
Task | None

Task object or None if not found.

get_task_by_client_id(client_id)

Get a task by client_id.

Parameters:

Name Type Description Default
client_id str

Client-provided identifier.

required

Returns:

Type Description
Task | None

Task object or None if not found.

get_periodic_by_client_id(client_id)

Get a periodic task by client_id.

Parameters:

Name Type Description Default
client_id str

Client-provided identifier.

required

Returns:

Type Description
Periodic | None

Periodic object or None if not found.

list_failed(limit=100)

List failed tasks.

Parameters:

Name Type Description Default
limit int

Maximum number of tasks to return.

100

Returns:

Type Description
list[Task]

List of failed tasks, most recent first.

list_completed(limit=100)

List completed tasks.

Parameters:

Name Type Description Default
limit int

Maximum number of tasks to return.

100

Returns:

Type Description
list[Task]

List of completed tasks, most recent first.

clear_failed(before=None)

Clear failed tasks.

Parameters:

Name Type Description Default
before datetime | None

Only clear tasks failed before this time. If None, clears all.

None

Returns:

Type Description
int

Number of tasks deleted.

clear_completed(before=None)

Clear completed tasks.

Parameters:

Name Type Description Default
before datetime | None

Only clear tasks completed before this time. If None, clears all.

None

Returns:

Type Description
int

Number of tasks deleted.

clear_all()

Clear all tasks and periodic schedules.

Priority

Bases: IntEnum

Task priority levels.

Higher values = higher priority (processed first).

TaskStatus

Bases: StrEnum

Task execution status.

Models

Bases: Base

One-off task with status tracking.

Bases: Base

Recurring task with interval or cron scheduling.

Worker Hooks

Bases: Protocol

Protocol for pre-execution hooks called before task handler runs.

Called in the forked child process before task execution. Use for initializing fork-unsafe resources (OTel, DB connections).

__call__(task)

Called before task execution.

Parameters:

Name Type Description Default
task Task | Periodic

The task about to be executed.

required

Bases: Protocol

Protocol for post-execution hooks called after task handler completes.

Called in the forked child process after task execution (success or failure). Use for cleanup/flushing (OTel traces, etc.).

__call__(task, error)

Called after task execution.

Parameters:

Name Type Description Default
task Task | Periodic

The task that was executed.

required
error Exception | None

The exception if task failed, None if successful.

required

Exceptions

Bases: WorkerError

Raised when a task exceeds its max runtime.