PySpark
L'API Python d'Apache Spark, le moteur de calcul distribué. En une seule API tu manipules des datasets de quelques Mo comme des téraoctets répartis sur des dizaines de machines. Le standard du « big data » côté traitement.
À quoi ça sert
pandas est parfait jusqu'à quelques Go de données. Au-delà, il sature la RAM d'une machine. Spark résout ce problème en distribuant automatiquement le calcul sur plusieurs nœuds d'un cluster : tu écris ton code comme s'il s'agissait d'un seul DataFrame, Spark découpe en partitions et fait travailler chaque machine sur sa part.
- ETL massif — transformer des dizaines de To de logs, de ventes ou de capteurs IoT.
- Préparation de datasets ML — agréger, joindre, nettoyer de gros volumes pour entraîner ensuite un modèle.
- Requêtes SQL distribuées — Spark SQL transforme un DataFrame en table requêtable en SQL, sans serveur de base de données.
- Streaming — traiter des flux en quasi temps réel (Spark Structured Streaming, alimenté par Kafka par exemple).
pandas : jusqu'à quelques Go, données qui tiennent en RAM, dev rapide, écosystème ML riche. Polars : pour aller plus vite sur une seule machine (10-100 Go), API très proche de pandas mais multi-cœur et écrite en Rust. PySpark : quand un seul nœud ne suffit plus — données sur S3/HDFS, traitement réparti sur plusieurs machines. Coût d'entrée plus élevé (cluster, JVM), mais scale virtuellement sans limite.
Les 3 concepts clés
- SparkSession — le point d'entrée. C'est l'objet qui crée la connexion au cluster (ou au mode « local » sur ta machine) et à partir duquel tu lis tes données. Une seule par programme.
- DataFrame — la structure principale, conceptuellement identique à un DataFrame pandas (lignes × colonnes nommées). En interne, c'est un ensemble de partitions réparties sur les workers.
-
Lazy execution — c'est la différence
fondamentale avec pandas. Les transformations
(
filter,select,groupBy) ne sont pas exécutées immédiatement : elles sont accumulées dans un plan logique, qu'on n'exécute que lorsqu'une action (show,collect,write) le demande. Spark optimise alors l'ensemble avant de lancer le calcul.
Une transformation renvoie un nouveau DataFrame
(lazy) : select, filter, withColumn,
groupBy, join… Tant que tu enchaînes des
transformations, rien ne s'exécute. Une action
déclenche le calcul réel : show(), count(),
collect(), write. Si tu lances un script qui
« ne fait rien » alors qu'il a un filter(), c'est
normal — il te manque une action.
Un exemple d'usage
Reprenons l'exemple du fichier de ventes vu côté pandas, mais cette fois en PySpark :
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Créer la session Spark (mode local, tous les cœurs)
spark = SparkSession.builder.appName("ventes").master("local[*]").getOrCreate()
# Charger le CSV (lazy : aucune lecture réelle pour l'instant)
df = spark.read.csv("ventes.csv", header=True, inferSchema=True)
# Filtrer + grouper + agréger (toujours lazy)
totaux = (
df.filter(F.col("montant") > 100)
.groupBy("produit")
.agg(F.sum("montant").alias("total"))
.orderBy(F.desc("total"))
)
# Action : c'est ici que Spark exécute réellement
totaux.show(10)
spark.stop()
Le code ressemble à pandas mais a deux différences clés : on passe
par F.col() pour référencer une colonne, et rien ne s'exécute
avant l'appel à show(). Sur 100 Mo c'est plus lent que pandas
(overhead de la JVM), sur 100 Go ça devient gagnant — et sur 100 To c'est
la seule option.
How-to : installer et utiliser PySpark
-
Installer PySpark
PySpark embarque Spark dans son wheel. Mais Spark est en Scala/Java, donc il te faut aussi Java 11 ou 17 sur ta machine (souvent déjà présent, sinon
sudo apt install openjdk-17-jresur Linux).bashuv add pyspark java --version # vérifier que Java est dispoL'astuce pour le dev localPour expérimenter, le mode
local[*]fait tourner Spark sur ta machine en utilisant tous les cœurs, sans cluster. C'est ce qu'on utilise en formation et pour développer avant de déployer. -
Créer une SparkSession
pythonfrom pyspark.sql import SparkSession spark = ( SparkSession.builder .appName("mon-app") .master("local[*]") # local, tous les cœurs .config("spark.sql.shuffle.partitions", "8") .getOrCreate() )Pour un cluster réel, on remplace
local[*]parspark://master:7077(Spark standalone) ouyarn(cluster Hadoop), mais le reste du code ne change pas. -
Charger des données
Spark lit nativement CSV, JSON, Parquet, Avro, ORC, mais aussi directement depuis S3 (MinIO), HDFS, JDBC. Le format de prédilection en prod est Parquet : colonnes, compressé, typé.
pythondf = spark.read.csv("f.csv", header=True, inferSchema=True) df = spark.read.parquet("f.parquet") df = spark.read.json("f.json") # Depuis MinIO/S3 df = spark.read.parquet("s3a://bucket/path/") -
Explorer un DataFrame
pythondf.show(5) # aperçu (action) df.printSchema() # types des colonnes df.columns # noms des colonnes df.count() # nb de lignes (action) df.describe().show() # stats sur les colonnes numériques -
Sélectionner, filtrer, transformer
pythonfrom pyspark.sql import functions as F # Sélectionner des colonnes df.select("produit", "montant") # Filtrer des lignes df.filter(F.col("montant") > 100) df.filter((F.col("montant") > 100) & (F.col("produit") == "A")) # Ajouter une colonne calculée df.withColumn("ttc", F.col("montant") * 1.20) # Renommer / supprimer df.withColumnRenamed("montant", "montant_ht") df.drop("colonne_inutile") -
Grouper et agréger
pythondf.groupBy("produit").agg( F.sum("montant").alias("total"), F.avg("montant").alias("moyenne"), F.count("*").alias("nb"), ).orderBy(F.desc("total")) -
Joindre deux DataFrames
pythonventes.join(produits, on="produit_id", how="left") ventes.join(produits, ventes.pid == produits.id, how="inner")Attention au shuffleUn
joinou ungroupByentraîne souvent un shuffle : Spark redistribue les données entre les workers selon la clé. C'est l'opération la plus coûteuse du framework. Quand c'est possible, broadcaste la petite table :df.join(F.broadcast(petit_df), ...)— ça évite le shuffle. -
Spark SQL : du SQL directement
On peut « publier » un DataFrame comme table temporaire et le requêter en SQL pur :
pythondf.createOrReplaceTempView("ventes") resultat = spark.sql(""" SELECT produit, SUM(montant) AS total FROM ventes WHERE montant > 100 GROUP BY produit ORDER BY total DESC """) resultat.show()Très pratique pour les requêtes longues ou pour porter du code SQL existant vers Spark sans tout réécrire.
-
Sauvegarder le résultat
pythondf.write.parquet("sortie/", mode="overwrite") df.write.csv("sortie.csv", header=True) df.write.mode("append").parquet("data/")En sortie, Spark écrit en plusieurs fichiers (un par partition) — c'est voulu, ça permet la parallélisation à la relecture. Pour un seul fichier,
df.coalesce(1).write...mais c'est à éviter sur de gros volumes. -
Convertir vers/depuis pandas
Quand le résultat tient en mémoire, on peut basculer vers pandas pour la phase finale (visu, ML) :
pythonpdf = df.toPandas() # Spark → pandas (ATTENTION : tout en mémoire) sdf = spark.createDataFrame(pdf) # pandas → SparkLe piège dutoPandas()toPandas()ramène toutes les lignes sur le driver (ta machine) — il faut que ça tienne en RAM. À n'utiliser qu'après avoir filtré/agrégé pour passer de To à quelques Mo. Même remarque pourcollect().
Aide-mémoire
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.stop()
spark.read.csv("f.csv", header=True, inferSchema=True)
spark.read.parquet("f.parquet")
df.write.parquet("out/", mode="overwrite")
df.select("a", "b")
df.filter(F.col("x") > 0)
df.withColumn("y", F.col("x") * 2)
df.groupBy("k").agg(F.sum("v").alias("total"))
df.join(other, on="id", how="left")
df.orderBy(F.desc("total"))
df.dropDuplicates(["id"])
df.dropna(); df.fillna(0)
df.show(20)
df.count()
df.collect() # liste de Row (tout en RAM driver)
df.toPandas() # conversion (tout en RAM)
df.write.parquet("...")
df.createOrReplaceTempView("t")
spark.sql("SELECT * FROM t WHERE x > 0").show()
PySpark et le reste de l'écosystème
- pandas — API très proche. En MLOps, le pattern courant est : préparation lourde en PySpark (To de données → Mo agrégés), puis bascule en pandas pour l'entraînement.
-
Jupyter — l'environnement
idéal pour développer un job PySpark : on itère cellule par cellule
avec
df.show()à chaque étape. -
MinIO — Spark lit
nativement les buckets S3-compatibles via le préfixe
s3a://. C'est le combo classique pour un data lake auto-hébergé. -
MLflow — sait logger un
modèle
pyspark.ml(le module ML de Spark) avec sa signature. Pour traiter des features à grande échelle puis entraîner directement dans Spark, sans sortir vers pandas. - Kafka — Spark Structured Streaming s'abonne à un topic Kafka et traite les événements en micro-batches avec la même API DataFrame qu'en batch.
- Prefect — orchestrer un job PySpark planifié, monitorer son succès/échec et déclencher des étapes suivantes (ex. invalidation de cache, ré-entraînement).
Databricks est la plateforme commerciale fondée par les créateurs de Spark : un Spark managé clé en main, avec notebooks et orchestration intégrés. Delta Lake est leur format open source qui ajoute des transactions ACID au-dessus de Parquet — souvent évoqué avec Spark, à connaître au moins de nom.
Pour aller plus loin
- Site officiel : spark.apache.org/docs/latest/api/python
- Guide officiel PySpark : Getting Started with PySpark
- Liste des fonctions
F: pyspark.sql.functions - Delta Lake (table format ACID au-dessus de Parquet) : delta.io