PySpark - Module 6.1: Supervised Learning at Scale

PySpark - Module 6.1: Supervised Learning at Scale

Supervised Learning at Scale

In Module 6.0 you built one pipeline with one model. Here you swap models, measure them properly, and let Spark search for good settings. The pipeline you already know does not change. You only replace the final stage and add a way to compare results.

This module assumes the feature pipeline from 6.0: a StringIndexer, a OneHotEncoder, and a VectorAssembler that produce a features column. Keep the same notebook on environment version 4.

Swapping the model

Every MLlib classifier takes the same featuresCol and labelCol, so moving from one to another is a one-line change to the last stage.

from pyspark.ml import Pipeline
from pyspark.ml.classification import (
    LogisticRegression, DecisionTreeClassifier,
    RandomForestClassifier, GBTClassifier,
)

models = {
    "logreg": LogisticRegression(featuresCol="features", labelCol="label"),
    "tree":   DecisionTreeClassifier(featuresCol="features", labelCol="label"),
    "forest": RandomForestClassifier(featuresCol="features", labelCol="label"),
    "gbt":    GBTClassifier(featuresCol="features", labelCol="label", maxIter=10),
}

fitted = {name: Pipeline(stages=[indexer, encoder, assembler, clf]).fit(train)
          for name, clf in models.items()}

A short guide to when each one earns its place:

  • Logistic Regression is fast, and its coefficients are easy to explain. Reach for it first as a baseline.
  • Decision Tree captures non-linear splits and is readable, but a single tree overfits.
  • Random Forest averages many trees. It is the dependable default for tabular data.
  • Gradient Boosted Trees often score highest, at the cost of longer training and more careful tuning.

Measuring a model

A model is only as good as the metric you judge it by. MLlib gives you one evaluator per problem type.

from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator, MulticlassClassificationEvaluator,
)

auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
f1  = MulticlassClassificationEvaluator(labelCol="label", metricName="f1")

for name, model in fitted.items():
    pred = model.transform(test)
    print(f"{name:7s}  AUC={auc.evaluate(pred):.3f}  F1={f1.evaluate(pred):.3f}")

Use BinaryClassificationEvaluator (area under ROC or PR) for two-class problems, MulticlassClassificationEvaluator (f1, accuracy) when there are more than two classes, and RegressionEvaluator (RMSE, R2) when the label is a number.

Which features matter

Tree models report how much each feature contributed. The importances line up with the columns that went into the VectorAssembler.

forest = fitted["forest"].stages[-1]
print(forest.featureImportances)

Tuning with CrossValidator

Picking numTrees or maxDepth by hand is guessing. CrossValidator trains every combination in a ParamGridBuilder across several folds and keeps the best.

On serverless, CrossValidator needs a scratch location, so create a Unity Catalog Volume once and point Spark ML at it. This is also the Volume you will reuse in 6.3 to save models.

import os

cat = spark.sql("SELECT current_catalog()").first()[0]
sch = spark.sql("SELECT current_schema()").first()[0]
spark.sql(f"CREATE VOLUME IF NOT EXISTS {cat}.{sch}.ml_models")
os.environ["SPARKML_TEMP_DFS_PATH"] = f"/Volumes/{cat}/{sch}/ml_models"

Now run the search:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

forest = RandomForestClassifier(featuresCol="features", labelCol="label")
grid = (ParamGridBuilder()
        .addGrid(forest.numTrees, [10, 30])
        .addGrid(forest.maxDepth, [3, 6])
        .build())

cv = CrossValidator(
    estimator=Pipeline(stages=[indexer, encoder, assembler, forest]),
    estimatorParamMaps=grid,
    evaluator=auc,
    numFolds=3,
)
cv_model = cv.fit(train)
print("Best AUC:", round(auc.evaluate(cv_model.transform(test)), 3))

cv_model.bestModel is a normal PipelineModel, fitted with the winning settings, ready to score new data.

A note on tuning libraries

You will read about Hyperopt and Optuna for hyperparameter search. Here is what works where:

  • CrossValidator is Spark’s own distributed search. It runs on Free Edition serverless and is what this module uses.
  • Hyperopt can distribute trials with SparkTrials, but that path needs the Spark JVM context, which serverless does not expose, so it runs only on classic compute.
  • Optuna runs single-machine on the driver. It is fine for a small search, but it does not distribute the trials across the cluster.

For this course, stay with CrossValidator. It is distributed, it is built in, and it runs where your students do.

Try it yourself

  • Add forest.minInstancesPerNode to the grid and see whether it changes the winner.
  • Switch the evaluator metric from areaUnderROC to areaUnderPR and compare.
  • Sort the feature importances and name the two columns that drive the prediction.

Key questions you can now answer

  • How do you change which algorithm a pipeline uses, and what stays the same?
  • Which evaluator do you use for two classes, many classes, and a numeric target?
  • How does CrossValidator choose its best model, and why does it need a Volume on serverless?
  • Why can Hyperopt’s distributed mode not run on Free Edition, and what do you use instead?
  • What do feature importances tell you, and what do they not tell you?

Previous: Module 6.0: MLlib Fundamentals | Next: Module 6.2: Unsupervised Learning and Recommendation