Celery
Le grand classique Python pour exécuter des tâches en arrière-plan : tu écris des fonctions, Celery les distribue à des workers via un broker (Redis ou RabbitMQ), gère les retries et les résultats.
Celery exécute les tâches, mais il n'a pas d'UI : pour voir ce qu'il fait (tâches en cours, échouées, débit, workers connectés), on branche Flower par-dessus. C'est un peu l'équivalent de Prometheus + Grafana, mais spécifique à Celery. Lis cette fiche puis enchaîne sur la fiche Flower.
À quoi ça sert
Une route HTTP doit répondre vite (en quelques centaines de ms maximum). Mais parfois ce qu'elle déclenche est lent : envoyer un e-mail, générer un PDF, lancer une prédiction de modèle volumineux, importer un CSV. Si tu fais ça dans la route, l'utilisateur attend, l'API se bloque, et un timeout HTTP peut couper le travail à mi-chemin.
Celery découpe le problème :
- L'API publie une tâche (un appel de fonction) dans une file (le broker) et répond immédiatement.
- Un ou plusieurs workers Celery, des processus Python à part, consomment les tâches et les exécutent.
- Le résultat (s'il y en a un) est stocké dans un result backend que ton API peut interroger.
- En cas d'échec, Celery peut réessayer avec un backoff configurable.
Celery = le travailleur en arrière-plan de ton appli Python. Tu lui passes une fonction et des arguments, il la lance ailleurs, plus tard, et il réessaie si ça plante.
Un exemple d'usage
Une API FastAPI qui reçoit une image et lance une analyse longue (modèle
vision). On veut renvoyer un task_id tout de suite, et
laisser le client interroger plus tard pour récupérer le résultat.
from celery import Celery
app = Celery(
"monprojet",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
@app.task(bind=True, max_retries=3)
def analyze_image(self, path: str) -> dict:
try:
result = heavy_inference(path)
return {"label": result, "path": path}
except ConnectionError as e:
raise self.retry(exc=e, countdown=5)
from fastapi import FastAPI
from tasks import analyze_image, app as celery_app
api = FastAPI()
@api.post("/analyze")
def enqueue(path: str):
task = analyze_image.delay(path)
return {"task_id": task.id}
@api.get("/result/{task_id}")
def get_result(task_id: str):
res = celery_app.AsyncResult(task_id)
return {"state": res.state, "result": res.result if res.ready() else None}
L'API répond en quelques ms même si l'analyse dure 30 s. Le client
récupère son résultat plus tard sur /result/<id>.
How-to : démarrer avec Celery
-
Installer Celery + un broker
On utilise Redis comme broker pour démarrer (le plus simple). Lance Redis en Docker (cf. fiche Redis), puis :
bashuv add celery redisPour utiliser RabbitMQ à la place, ajoute
amqp://...comme broker URL — Celery est agnostique. -
Définir l'app et les tâches
Crée
tasks.pyavec le contenu de l'exemple plus haut. Le pattern important : une tâche est une fonction normale décorée par@app.task. Aucune logique réseau dans ton code. -
Lancer un worker
bashuv run celery -A tasks worker --loglevel=infoLe worker se connecte au broker, déclare quelles tâches il sait exécuter, et attend. À chaque
analyze_image.delay(...)depuis ton API, il prend la tâche et la lance.Pour scaler, lance plusieurs workers (
--concurrency=4pour 4 process en parallèle, ou-P geventpour de l'I/O-bound). -
Appel synchrone vs asynchrone
Trois façons d'appeler une tâche :
analyze_image.delay(path)— raccourci async, le plus courant.analyze_image.apply_async(args=[path], countdown=10)— version complète avec options (délai, queue, priorité…).analyze_image(path)— appel direct, sans broker. Pratique en test.
Dans tous les cas async, tu récupères un
AsyncResultavec unid, qu'on peut interroger plus tard (.ready(),.get(),.state). -
Retries et idempotence
Avec
bind=Trueetself.retry(...), ta tâche peut se replanifier en cas d'erreur transitoire (réseau, 429, etc.). Mais attention : une tâche peut être exécutée plusieurs fois (worker qui crashe avant ack, retry…). Écris-les idempotentes : la lancer 3 fois doit donner le même état final que de la lancer 1 fois (ex. utiliser untask_idcomme clé d'insertion en base). -
Tâches périodiques (Celery Beat)
Pour des jobs récurrents (« nettoyer la table chaque nuit »), ajoute le scheduler Beat :
python (tasks.py)app.conf.beat_schedule = { "cleanup-nightly": { "task": "tasks.cleanup", "schedule": crontab(hour=3, minute=0), }, }Lance Beat à côté du worker :
celery -A tasks beat --loglevel=info. Pas plus d'un Beat à la fois — sinon les jobs sont déclenchés en double. -
Plusieurs queues
Tu peux router certaines tâches vers une queue dédiée (par exemple
gpupour les inférences lourdes, traitée par un worker spécifique). Utile pour ne pas mélanger « petites tâches rapides » et « grosses tâches lentes » dans la même file. -
Voir ce qui se passe : Flower
La CLI suffit pour debug rapide, mais pour suivre Celery au quotidien on lance Flower, l'UI web officielle. Le how-to est dans la fiche Flower.
Aide-mémoire
from celery import Celery
app = Celery("x", broker="redis://...", backend="redis://...")
@app.task
def add(a, b): return a + b
# Appels
add.delay(1, 2)
add.apply_async(args=[1, 2], countdown=5, queue="slow")
# Récupérer un résultat
res = app.AsyncResult(task_id)
res.state # PENDING / STARTED / SUCCESS / FAILURE / RETRY
res.get(timeout=10) # bloque, à éviter dans une route HTTP
celery -A tasks worker -l info --concurrency=4
celery -A tasks beat -l info
celery -A tasks inspect active
celery -A tasks inspect registered
celery -A tasks purge # vide les queues (dev)
Celery et le reste de l'écosystème
- Flower — l'UI temps réel pour Celery. Voir l'encart en haut.
- Redis / RabbitMQ — les deux brokers recommandés. Redis = simple, rapide, parfait pour démarrer. RabbitMQ = plus robuste, meilleur pour des garanties strictes.
-
FastAPI — combo très
commun. Pattern : la route publie une tâche, renvoie 202 + un
task_id, le client poll/result/<id>. - Docker — typiquement 3 services dans le compose : ton API, un worker Celery, et le broker. Tous partagent l'image Python (juste la commande change).
- Kubernetes — le worker est un Deployment scalable horizontalement. On peut auto-scaler selon la profondeur de la queue (KEDA + Redis/RabbitMQ scaler).
- MLflow — pattern courant : une tâche Celery télécharge un modèle MLflow, fait l'inférence, renvoie le résultat. L'API reste rapide, l'inférence est isolée.
Pour aller plus loin
- Documentation officielle : docs.celeryq.dev
- First Steps : first-steps-with-celery
- Repo GitHub : github.com/celery/celery