Background Jobs API Reference¶
Complete API reference for RapidAI's background job system.
Decorator¶
background¶
Decorator to run a function as a background job.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
max_retries |
int |
3 |
Maximum retry attempts on failure |
queue |
JobQueue |
None |
Custom job queue (uses default if None) |
Returns: Decorated function with enqueue(), get_result(), and cancel() methods
Example:
from rapidai.background import background
@background(max_retries=5)
async def process_task(data: str):
return {"processed": data}
# Enqueue
job_id = await process_task.enqueue(data="test")
# Get result
result = await process_task.get_result(job_id)
# Cancel
cancelled = await process_task.cancel(job_id)
Classes¶
JobResult¶
@dataclass
class JobResult:
job_id: str
status: JobStatus
result: Any = None
error: Optional[str] = None
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
attempts: int = 0
max_retries: int = 3
Properties:
| Property | Type | Description |
|---|---|---|
job_id |
str |
Unique job identifier |
status |
JobStatus |
Current job status |
result |
Any |
Job result (if completed) |
error |
str |
Error message (if failed) |
created_at |
datetime |
When job was created |
started_at |
datetime |
When job started running |
completed_at |
datetime |
When job finished |
attempts |
int |
Number of execution attempts |
max_retries |
int |
Maximum retry attempts |
Methods:
duration¶
Get job duration in seconds.
Returns: float - Duration in seconds, or None if not completed
is_done¶
Check if job is in a terminal state.
Returns: bool - True if completed, failed, or cancelled
JobStatus¶
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
Values:
PENDING- Job queued, waiting to startRUNNING- Job currently executingCOMPLETED- Job finished successfullyFAILED- Job failed after all retriesCANCELLED- Job was cancelled
JobQueue¶
class JobQueue:
async def enqueue(
self,
job_id: str,
func: Callable,
args: tuple,
kwargs: dict,
max_retries: int = 3
) -> str
async def get_result(self, job_id: str) -> Optional[JobResult]
async def cancel(self, job_id: str) -> bool
async def list_jobs(
self,
status: Optional[JobStatus] = None
) -> List[JobResult]
Base class for job queue backends.
InMemoryQueue¶
In-memory job queue implementation.
Features: - Fast execution - No external dependencies - Jobs lost on restart - Single-server only
Example:
RedisQueue¶
class RedisQueue(JobQueue):
def __init__(
self,
url: str = "redis://localhost:6379",
prefix: str = "rapidai:jobs:"
) -> None
Redis-backed job queue implementation.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
url |
str |
"redis://localhost:6379" |
Redis connection URL |
prefix |
str |
"rapidai:jobs:" |
Key prefix for namespacing |
Features: - Persistent storage - Survives restarts - Multi-server support - Production-ready
Example:
from rapidai.background import RedisQueue
queue = RedisQueue(
url="redis://localhost:6379",
prefix="myapp:jobs:"
)
Functions¶
get_queue¶
Get or create job queue.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
backend |
str |
"memory" |
Queue backend ("memory" or "redis") |
**kwargs |
Any |
- | Backend-specific arguments |
Returns: JobQueue - Queue instance
Example:
from rapidai.background import get_queue
# In-memory queue
queue = get_queue(backend="memory")
# Redis queue
queue = get_queue(
backend="redis",
url="redis://localhost:6379"
)
Exceptions¶
JobError¶
Raised for job-related errors.
Complete Example¶
from rapidai import App
from rapidai.background import background, get_queue, JobStatus
app = App()
# Configure Redis queue for production
queue = get_queue(
backend="redis",
url="redis://localhost:6379"
)
@background(max_retries=3, queue=queue)
async def process_data(data_id: str):
"""Process data with automatic retries."""
# Simulate processing
result = await external_api.process(data_id)
return {"data_id": data_id, "result": result}
@app.route("/process", methods=["POST"])
async def start_processing(data_id: str):
"""Start background processing."""
job_id = await process_data.enqueue(data_id=data_id)
return {
"job_id": job_id,
"status": "queued",
"check_url": f"/jobs/{job_id}"
}
@app.route("/jobs/<job_id>", methods=["GET"])
async def get_job_status(job_id: str):
"""Get job status and result."""
result = await process_data.get_result(job_id)
if not result:
return {"error": "Job not found"}, 404
response = {
"job_id": result.job_id,
"status": result.status.value,
"created_at": result.created_at.isoformat(),
"attempts": result.attempts
}
if result.is_done:
response["completed_at"] = result.completed_at.isoformat()
response["duration"] = result.duration
if result.status == JobStatus.COMPLETED:
response["result"] = result.result
elif result.status == JobStatus.FAILED:
response["error"] = result.error
return response
@app.route("/jobs/<job_id>", methods=["DELETE"])
async def cancel_job(job_id: str):
"""Cancel a running job."""
cancelled = await process_data.cancel(job_id)
if cancelled:
return {"status": "cancelled"}
else:
return {"error": "Cannot cancel job"}, 400
@app.route("/jobs", methods=["GET"])
async def list_all_jobs(status: str = None):
"""List all jobs, optionally filtered by status."""
job_status = JobStatus(status) if status else None
jobs = await queue.list_jobs(status=job_status)
return {
"jobs": [
{
"job_id": job.job_id,
"status": job.status.value,
"created_at": job.created_at.isoformat()
}
for job in jobs
]
}
See Also¶
- Background Jobs Guide - Complete usage guide
- Monitoring - Track job performance
- Testing - Test background jobs