Le duo Celery + Flower

Celery exécute les tâches, mais il n'a pas d'UI : pour voir ce qu'il fait (tâches en cours, échouées, débit, workers connectés), on branche Flower par-dessus. C'est un peu l'équivalent de Prometheus + Grafana, mais spécifique à Celery. Lis cette fiche puis enchaîne sur la fiche Flower.

À quoi ça sert

Une route HTTP doit répondre vite (en quelques centaines de ms maximum). Mais parfois ce qu'elle déclenche est lent : envoyer un e-mail, générer un PDF, lancer une prédiction de modèle volumineux, importer un CSV. Si tu fais ça dans la route, l'utilisateur attend, l'API se bloque, et un timeout HTTP peut couper le travail à mi-chemin.

Celery découpe le problème :

  • L'API publie une tâche (un appel de fonction) dans une file (le broker) et répond immédiatement.
  • Un ou plusieurs workers Celery, des processus Python à part, consomment les tâches et les exécutent.
  • Le résultat (s'il y en a un) est stocké dans un result backend que ton API peut interroger.
  • En cas d'échec, Celery peut réessayer avec un backoff configurable.
En une phrase

Celery = le travailleur en arrière-plan de ton appli Python. Tu lui passes une fonction et des arguments, il la lance ailleurs, plus tard, et il réessaie si ça plante.

Un exemple d'usage

Une API FastAPI qui reçoit une image et lance une analyse longue (modèle vision). On veut renvoyer un task_id tout de suite, et laisser le client interroger plus tard pour récupérer le résultat.

python (tasks.py)
from celery import Celery

app = Celery(
    "monprojet",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

@app.task(bind=True, max_retries=3)
def analyze_image(self, path: str) -> dict:
    try:
        result = heavy_inference(path)
        return {"label": result, "path": path}
    except ConnectionError as e:
        raise self.retry(exc=e, countdown=5)
python (côté API FastAPI)
from fastapi import FastAPI
from tasks import analyze_image, app as celery_app

api = FastAPI()

@api.post("/analyze")
def enqueue(path: str):
    task = analyze_image.delay(path)
    return {"task_id": task.id}

@api.get("/result/{task_id}")
def get_result(task_id: str):
    res = celery_app.AsyncResult(task_id)
    return {"state": res.state, "result": res.result if res.ready() else None}

L'API répond en quelques ms même si l'analyse dure 30 s. Le client récupère son résultat plus tard sur /result/<id>.

How-to : démarrer avec Celery

  1. Installer Celery + un broker

    On utilise Redis comme broker pour démarrer (le plus simple). Lance Redis en Docker (cf. fiche Redis), puis :

    bash
    uv add celery redis

    Pour utiliser RabbitMQ à la place, ajoute amqp://... comme broker URL — Celery est agnostique.

  2. Définir l'app et les tâches

    Crée tasks.py avec le contenu de l'exemple plus haut. Le pattern important : une tâche est une fonction normale décorée par @app.task. Aucune logique réseau dans ton code.

  3. Lancer un worker

    bash
    uv run celery -A tasks worker --loglevel=info

    Le worker se connecte au broker, déclare quelles tâches il sait exécuter, et attend. À chaque analyze_image.delay(...) depuis ton API, il prend la tâche et la lance.

    Pour scaler, lance plusieurs workers (--concurrency=4 pour 4 process en parallèle, ou -P gevent pour de l'I/O-bound).

  4. Appel synchrone vs asynchrone

    Trois façons d'appeler une tâche :

    • analyze_image.delay(path) — raccourci async, le plus courant.
    • analyze_image.apply_async(args=[path], countdown=10) — version complète avec options (délai, queue, priorité…).
    • analyze_image(path) — appel direct, sans broker. Pratique en test.

    Dans tous les cas async, tu récupères un AsyncResult avec un id, qu'on peut interroger plus tard (.ready(), .get(), .state).

  5. Retries et idempotence

    Avec bind=True et self.retry(...), ta tâche peut se replanifier en cas d'erreur transitoire (réseau, 429, etc.). Mais attention : une tâche peut être exécutée plusieurs fois (worker qui crashe avant ack, retry…). Écris-les idempotentes : la lancer 3 fois doit donner le même état final que de la lancer 1 fois (ex. utiliser un task_id comme clé d'insertion en base).

  6. Tâches périodiques (Celery Beat)

    Pour des jobs récurrents (« nettoyer la table chaque nuit »), ajoute le scheduler Beat :

    python (tasks.py)
    app.conf.beat_schedule = {
        "cleanup-nightly": {
            "task": "tasks.cleanup",
            "schedule": crontab(hour=3, minute=0),
        },
    }

    Lance Beat à côté du worker : celery -A tasks beat --loglevel=info. Pas plus d'un Beat à la fois — sinon les jobs sont déclenchés en double.

  7. Plusieurs queues

    Tu peux router certaines tâches vers une queue dédiée (par exemple gpu pour les inférences lourdes, traitée par un worker spécifique). Utile pour ne pas mélanger « petites tâches rapides » et « grosses tâches lentes » dans la même file.

  8. Voir ce qui se passe : Flower

    La CLI suffit pour debug rapide, mais pour suivre Celery au quotidien on lance Flower, l'UI web officielle. Le how-to est dans la fiche Flower.

Aide-mémoire

python (API la plus utilisée)
from celery import Celery
app = Celery("x", broker="redis://...", backend="redis://...")

@app.task
def add(a, b): return a + b

# Appels
add.delay(1, 2)
add.apply_async(args=[1, 2], countdown=5, queue="slow")

# Récupérer un résultat
res = app.AsyncResult(task_id)
res.state          # PENDING / STARTED / SUCCESS / FAILURE / RETRY
res.get(timeout=10)  # bloque, à éviter dans une route HTTP
CLI
celery -A tasks worker -l info --concurrency=4
celery -A tasks beat -l info
celery -A tasks inspect active
celery -A tasks inspect registered
celery -A tasks purge          # vide les queues (dev)

Celery et le reste de l'écosystème

  • Flower — l'UI temps réel pour Celery. Voir l'encart en haut.
  • Redis / RabbitMQ — les deux brokers recommandés. Redis = simple, rapide, parfait pour démarrer. RabbitMQ = plus robuste, meilleur pour des garanties strictes.
  • FastAPI — combo très commun. Pattern : la route publie une tâche, renvoie 202 + un task_id, le client poll /result/<id>.
  • Docker — typiquement 3 services dans le compose : ton API, un worker Celery, et le broker. Tous partagent l'image Python (juste la commande change).
  • Kubernetes — le worker est un Deployment scalable horizontalement. On peut auto-scaler selon la profondeur de la queue (KEDA + Redis/RabbitMQ scaler).
  • MLflow — pattern courant : une tâche Celery télécharge un modèle MLflow, fait l'inférence, renvoie le résultat. L'API reste rapide, l'inférence est isolée.

Pour aller plus loin