À quoi ça sert

pandas est parfait jusqu'à quelques Go de données. Au-delà, il sature la RAM d'une machine. Spark résout ce problème en distribuant automatiquement le calcul sur plusieurs nœuds d'un cluster : tu écris ton code comme s'il s'agissait d'un seul DataFrame, Spark découpe en partitions et fait travailler chaque machine sur sa part.

  • ETL massif — transformer des dizaines de To de logs, de ventes ou de capteurs IoT.
  • Préparation de datasets ML — agréger, joindre, nettoyer de gros volumes pour entraîner ensuite un modèle.
  • Requêtes SQL distribuées — Spark SQL transforme un DataFrame en table requêtable en SQL, sans serveur de base de données.
  • Streaming — traiter des flux en quasi temps réel (Spark Structured Streaming, alimenté par Kafka par exemple).
pandas, Polars, PySpark — quand utiliser quoi ?

pandas : jusqu'à quelques Go, données qui tiennent en RAM, dev rapide, écosystème ML riche. Polars : pour aller plus vite sur une seule machine (10-100 Go), API très proche de pandas mais multi-cœur et écrite en Rust. PySpark : quand un seul nœud ne suffit plus — données sur S3/HDFS, traitement réparti sur plusieurs machines. Coût d'entrée plus élevé (cluster, JVM), mais scale virtuellement sans limite.

Les 3 concepts clés

  • SparkSession — le point d'entrée. C'est l'objet qui crée la connexion au cluster (ou au mode « local » sur ta machine) et à partir duquel tu lis tes données. Une seule par programme.
  • DataFrame — la structure principale, conceptuellement identique à un DataFrame pandas (lignes × colonnes nommées). En interne, c'est un ensemble de partitions réparties sur les workers.
  • Lazy execution — c'est la différence fondamentale avec pandas. Les transformations (filter, select, groupBy) ne sont pas exécutées immédiatement : elles sont accumulées dans un plan logique, qu'on n'exécute que lorsqu'une action (show, collect, write) le demande. Spark optimise alors l'ensemble avant de lancer le calcul.
Transformations vs actions

Une transformation renvoie un nouveau DataFrame (lazy) : select, filter, withColumn, groupBy, join… Tant que tu enchaînes des transformations, rien ne s'exécute. Une action déclenche le calcul réel : show(), count(), collect(), write. Si tu lances un script qui « ne fait rien » alors qu'il a un filter(), c'est normal — il te manque une action.

Un exemple d'usage

Reprenons l'exemple du fichier de ventes vu côté pandas, mais cette fois en PySpark :

python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Créer la session Spark (mode local, tous les cœurs)
spark = SparkSession.builder.appName("ventes").master("local[*]").getOrCreate()

# Charger le CSV (lazy : aucune lecture réelle pour l'instant)
df = spark.read.csv("ventes.csv", header=True, inferSchema=True)

# Filtrer + grouper + agréger (toujours lazy)
totaux = (
    df.filter(F.col("montant") > 100)
      .groupBy("produit")
      .agg(F.sum("montant").alias("total"))
      .orderBy(F.desc("total"))
)

# Action : c'est ici que Spark exécute réellement
totaux.show(10)

spark.stop()

Le code ressemble à pandas mais a deux différences clés : on passe par F.col() pour référencer une colonne, et rien ne s'exécute avant l'appel à show(). Sur 100 Mo c'est plus lent que pandas (overhead de la JVM), sur 100 Go ça devient gagnant — et sur 100 To c'est la seule option.

How-to : installer et utiliser PySpark

  1. Installer PySpark

    PySpark embarque Spark dans son wheel. Mais Spark est en Scala/Java, donc il te faut aussi Java 11 ou 17 sur ta machine (souvent déjà présent, sinon sudo apt install openjdk-17-jre sur Linux).

    bash
    uv add pyspark
    java --version   # vérifier que Java est dispo
    L'astuce pour le dev local

    Pour expérimenter, le mode local[*] fait tourner Spark sur ta machine en utilisant tous les cœurs, sans cluster. C'est ce qu'on utilise en formation et pour développer avant de déployer.

  2. Créer une SparkSession

    python
    from pyspark.sql import SparkSession
    
    spark = (
        SparkSession.builder
        .appName("mon-app")
        .master("local[*]")            # local, tous les cœurs
        .config("spark.sql.shuffle.partitions", "8")
        .getOrCreate()
    )

    Pour un cluster réel, on remplace local[*] par spark://master:7077 (Spark standalone) ou yarn (cluster Hadoop), mais le reste du code ne change pas.

  3. Charger des données

    Spark lit nativement CSV, JSON, Parquet, Avro, ORC, mais aussi directement depuis S3 (MinIO), HDFS, JDBC. Le format de prédilection en prod est Parquet : colonnes, compressé, typé.

    python
    df = spark.read.csv("f.csv", header=True, inferSchema=True)
    df = spark.read.parquet("f.parquet")
    df = spark.read.json("f.json")
    
    # Depuis MinIO/S3
    df = spark.read.parquet("s3a://bucket/path/")
  4. Explorer un DataFrame

    python
    df.show(5)             # aperçu (action)
    df.printSchema()       # types des colonnes
    df.columns            # noms des colonnes
    df.count()            # nb de lignes (action)
    df.describe().show()  # stats sur les colonnes numériques
  5. Sélectionner, filtrer, transformer

    python
    from pyspark.sql import functions as F
    
    # Sélectionner des colonnes
    df.select("produit", "montant")
    
    # Filtrer des lignes
    df.filter(F.col("montant") > 100)
    df.filter((F.col("montant") > 100) & (F.col("produit") == "A"))
    
    # Ajouter une colonne calculée
    df.withColumn("ttc", F.col("montant") * 1.20)
    
    # Renommer / supprimer
    df.withColumnRenamed("montant", "montant_ht")
    df.drop("colonne_inutile")
  6. Grouper et agréger

    python
    df.groupBy("produit").agg(
        F.sum("montant").alias("total"),
        F.avg("montant").alias("moyenne"),
        F.count("*").alias("nb"),
    ).orderBy(F.desc("total"))
  7. Joindre deux DataFrames

    python
    ventes.join(produits, on="produit_id", how="left")
    ventes.join(produits, ventes.pid == produits.id, how="inner")
    Attention au shuffle

    Un join ou un groupBy entraîne souvent un shuffle : Spark redistribue les données entre les workers selon la clé. C'est l'opération la plus coûteuse du framework. Quand c'est possible, broadcaste la petite table : df.join(F.broadcast(petit_df), ...) — ça évite le shuffle.

  8. Spark SQL : du SQL directement

    On peut « publier » un DataFrame comme table temporaire et le requêter en SQL pur :

    python
    df.createOrReplaceTempView("ventes")
    
    resultat = spark.sql("""
        SELECT produit, SUM(montant) AS total
        FROM ventes
        WHERE montant > 100
        GROUP BY produit
        ORDER BY total DESC
    """)
    
    resultat.show()

    Très pratique pour les requêtes longues ou pour porter du code SQL existant vers Spark sans tout réécrire.

  9. Sauvegarder le résultat

    python
    df.write.parquet("sortie/", mode="overwrite")
    df.write.csv("sortie.csv", header=True)
    df.write.mode("append").parquet("data/")

    En sortie, Spark écrit en plusieurs fichiers (un par partition) — c'est voulu, ça permet la parallélisation à la relecture. Pour un seul fichier, df.coalesce(1).write... mais c'est à éviter sur de gros volumes.

  10. Convertir vers/depuis pandas

    Quand le résultat tient en mémoire, on peut basculer vers pandas pour la phase finale (visu, ML) :

    python
    pdf = df.toPandas()        # Spark → pandas (ATTENTION : tout en mémoire)
    sdf = spark.createDataFrame(pdf)  # pandas → Spark
    Le piège du toPandas()

    toPandas() ramène toutes les lignes sur le driver (ta machine) — il faut que ça tienne en RAM. À n'utiliser qu'après avoir filtré/agrégé pour passer de To à quelques Mo. Même remarque pour collect().

Aide-mémoire

python (session)
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.stop()
python (lecture / écriture)
spark.read.csv("f.csv", header=True, inferSchema=True)
spark.read.parquet("f.parquet")
df.write.parquet("out/", mode="overwrite")
python (transformations courantes)
df.select("a", "b")
df.filter(F.col("x") > 0)
df.withColumn("y", F.col("x") * 2)
df.groupBy("k").agg(F.sum("v").alias("total"))
df.join(other, on="id", how="left")
df.orderBy(F.desc("total"))
df.dropDuplicates(["id"])
df.dropna(); df.fillna(0)
python (actions — exécutent vraiment)
df.show(20)
df.count()
df.collect()           # liste de Row (tout en RAM driver)
df.toPandas()          # conversion (tout en RAM)
df.write.parquet("...")
python (Spark SQL)
df.createOrReplaceTempView("t")
spark.sql("SELECT * FROM t WHERE x > 0").show()

PySpark et le reste de l'écosystème

  • pandas — API très proche. En MLOps, le pattern courant est : préparation lourde en PySpark (To de données → Mo agrégés), puis bascule en pandas pour l'entraînement.
  • Jupyter — l'environnement idéal pour développer un job PySpark : on itère cellule par cellule avec df.show() à chaque étape.
  • MinIO — Spark lit nativement les buckets S3-compatibles via le préfixe s3a://. C'est le combo classique pour un data lake auto-hébergé.
  • MLflow — sait logger un modèle pyspark.ml (le module ML de Spark) avec sa signature. Pour traiter des features à grande échelle puis entraîner directement dans Spark, sans sortir vers pandas.
  • Kafka — Spark Structured Streaming s'abonne à un topic Kafka et traite les événements en micro-batches avec la même API DataFrame qu'en batch.
  • Prefect — orchestrer un job PySpark planifié, monitorer son succès/échec et déclencher des étapes suivantes (ex. invalidation de cache, ré-entraînement).
PySpark, Databricks et Delta Lake

Databricks est la plateforme commerciale fondée par les créateurs de Spark : un Spark managé clé en main, avec notebooks et orchestration intégrés. Delta Lake est leur format open source qui ajoute des transactions ACID au-dessus de Parquet — souvent évoqué avec Spark, à connaître au moins de nom.

Pour aller plus loin