Skip to main content
AI/MLwshobson

python-background-jobs

Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles.

Stars
36,167
Source
wshobson/agents
Updated
2026-05-29
Slug
wshobson--agents--python-background-jobs
View on GitHubRaw SKILL.md

// install — copy + paste into any project

mkdir -p .claude/skills && curl -fsSL https://raw.githubusercontent.com/wshobson/agents/HEAD/plugins/python-development/skills/python-background-jobs/SKILL.md -o .claude/skills/python-background-jobs.md

Drops the SKILL.md into .claude/skills/python-background-jobs.md. Works with Claude Code, Cursor, and any agent that loads SKILL.md files from .claude/skills/.

Python Background Jobs & Task Queues

Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.

When to Use This Skill

  • Processing tasks that take longer than a few seconds
  • Sending emails, notifications, or webhooks
  • Generating reports or exporting data
  • Processing uploads or media transformations
  • Integrating with unreliable external services
  • Building event-driven architectures

Core Concepts

1. Task Queue Pattern

API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.

2. Idempotency

Tasks may be retried on failure. Design for safe re-execution.

3. Job State Machine

Jobs transition through states: pending → running → succeeded/failed.

4. At-Least-Once Delivery

Most queues guarantee at-least-once delivery. Your code must handle duplicates.

Quick Start

This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

@app.task
def send_email(to: str, subject: str, body: str) -> None:
    # This runs in a background worker
    email_client.send(to, subject, body)

# In your API handler
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")

Fundamental Patterns

Pattern 1: Return Job ID Immediately

For operations exceeding a few seconds, return a job ID and process asynchronously.

from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime

class JobStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCEEDED = "succeeded"
    FAILED = "failed"

@dataclass
class Job:
    id: str
    status: JobStatus
    created_at: datetime
    started_at: datetime | None = None
    completed_at: datetime | None = None
    result: dict | None = None
    error: str | None = None

# API endpoint
async def start_export(request: ExportRequest) -> JobResponse:
    """Start export job and return job ID."""
    job_id = str(uuid4())

    # Persist job record
    await jobs_repo.create(Job(
        id=job_id,
        status=JobStatus.PENDING,
        created_at=datetime.utcnow(),
    ))

    # Enqueue task for background processing
    await task_queue.enqueue(
        "export_data",
        job_id=job_id,
        params=request.model_dump(),
    )

    # Return immediately with job ID
    return JobResponse(
        job_id=job_id,
        status="pending",
        poll_url=f"/jobs/{job_id}",
    )

Pattern 2: Celery Task Configuration

Configure Celery tasks with proper retry and timeout settings.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

# Global configuration
app.conf.update(
    task_time_limit=3600,          # Hard limit: 1 hour
    task_soft_time_limit=3000,      # Soft limit: 50 minutes
    task_acks_late=True,            # Acknowledge after completion
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,   # Don't prefetch too many tasks
)

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
    """Process payment with automatic retry on transient errors."""
    try:
        result = payment_gateway.charge(payment_id)
        return {"status": "success", "transaction_id": result.id}
    except PaymentDeclinedError as e:
        # Don't retry permanent failures
        return {"status": "declined", "reason": str(e)}
    except TransientError as e:
        # Retry with exponential backoff
        raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)

Pattern 3: Make Tasks Idempotent

Workers may retry on crash or timeout. Design for safe re-execution.

@app.task(bind=True)
def process_order(self, order_id: str) -> None:
    """Process order idempotently."""
    order = orders_repo.get(order_id)

    # Already processed? Return early
    if order.status == OrderStatus.COMPLETED:
        logger.info("Order already processed", order_id=order_id)
        return

    # Already in progress? Check if we should continue
    if order.status == OrderStatus.PROCESSING:
        # Use idempotency key to avoid double-charging
        pass

    # Process with idempotency key
    result = payment_provider.charge(
        amount=order.total,
        idempotency_key=f"order-{order_id}",  # Critical!
    )

    orders_repo.update(order_id, status=OrderStatus.COMPLETED)

Idempotency Strategies:

  1. Check-before-write: Verify state before action
  2. Idempotency keys: Use unique tokens with external services
  3. Upsert patterns: INSERT ... ON CONFLICT UPDATE
  4. Deduplication window: Track processed IDs for N hours

Pattern 4: Job State Management

Persist job state transitions for visibility and debugging.

class JobRepository:
    """Repository for managing job state."""

    async def create(self, job: Job) -> Job:
        """Create new job record."""
        await self._db.execute(
            """INSERT INTO jobs (id, status, created_at)
               VALUES ($1, $2, $3)""",
            job.id, job.status.value, job.created_at,
        )
        return job

    async def update_status(
        self,
        job_id: str,
        status: JobStatus,
        **fields,
    ) -> None:
        """Update job status with timestamp."""
        updates = {"status": status.value, **fields}

        if status == JobStatus.RUNNING:
            updates["started_at"] = datetime.utcnow()
        elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
            updates["completed_at"] = datetime.utcnow()

        await self._db.execute(
            "UPDATE jobs SET status = $1, ... WHERE id = $2",
            updates, job_id,
        )

        logger.info(
            "Job status updated",
            job_id=job_id,
            status=status.value,
        )

Detailed worked examples and patterns

Detailed sections (starting with ## Advanced Patterns) live in references/details.md. Read that file when the navigation summary above is insufficient.

Best Practices Summary

  1. Return immediately - Don't block requests for long operations
  2. Persist job state - Enable status polling and debugging
  3. Make tasks idempotent - Safe to retry on any failure
  4. Use idempotency keys - For external service calls
  5. Set timeouts - Both soft and hard limits
  6. Implement DLQ - Capture permanently failed tasks
  7. Log transitions - Track job state changes
  8. Retry appropriately - Exponential backoff for transient errors
  9. Don't retry permanent failures - Validation errors, invalid credentials
  10. Monitor queue depth - Alert on backlog growth