PySpark - Module 6.0: MLlib Fundamentals
MLlib Fundamentals
This is where the course turns from moving and shaping data to learning from it. MLlib is Spark’s machine learning library. The difference from scikit-learn is the same one you have met all course: scikit-learn trains on a single machine and the data has to fit in its memory, while MLlib trains across the cluster on a Spark DataFrame, so it keeps working when the data does not fit on one machine.
If you already know scikit-learn, the ideas carry over. You still split the data, fit a model, and score it. What changes is that every step becomes a stage in a Spark pipeline, and the work runs distributed.
Before you start: switch to environment version 4
MLlib uses Spark’s JVM engine, and on Free Edition serverless you have to opt in to it by selecting environment version 4.
- Open a notebook.
- On the right, open the Environment panel.
- Set the version to 4 and apply.
Without this, importing an MLlib transformer fails with a Py4J security error. With it, the code below runs. On serverless a trained model is capped at 100 MB, which is plenty for everything in this module. The serverless environment version 4 notes have the details.
Transformers, Estimators, and the Pipeline
MLlib has three building blocks, and almost everything you do is one of them.
- A Transformer takes a DataFrame and returns a new one with extra columns.
VectorAssembleris a Transformer: it reads several columns and writes onefeaturescolumn. - An Estimator has a
fitmethod. It studies data and produces a Transformer.LogisticRegressionis an Estimator: callingfitreturns a trained model that can transform new data. - A Pipeline chains stages in order. Calling
fiton the Pipeline fits each Estimator in turn and passes the result down the chain. The output is aPipelineModel, which is itself a Transformer you apply to new data.
Why this matters: the same sequence of steps you fit on training data is applied, identically, to test data and to production data. You cannot accidentally prepare features one way in training and another way at scoring time.
Feature engineering tools
Three Transformers cover most tabular feature work:
StringIndexerturns a string category column into numeric indexes (for example"a"and"b"become0.0and1.0).OneHotEncoderturns those indexes into one-hot vectors, so the model does not read the categories as ordered numbers.VectorAssemblercollects your numeric columns and encoded vectors into the singlefeaturescolumn that every MLlib model expects.
Hands-on: a binary classification pipeline
We generate a small dataset so you can run this immediately, with no file to download: two numeric columns, one category column, and a binary label.
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# A small synthetic dataset
df = (spark.range(2000)
.withColumn("x1", F.rand() * 10)
.withColumn("x2", F.rand() * 5)
.withColumn("cat", F.when(F.rand() > 0.5, "a").otherwise("b"))
.withColumn("label", F.when(F.col("x1") + F.col("x2") + F.rand() > 8, 1).otherwise(0)))
# Split first, so the test set stays unseen
train, test = df.randomSplit([0.8, 0.2], seed=42)
# Feature engineering stages
indexer = StringIndexer(inputCol="cat", outputCol="cat_idx")
encoder = OneHotEncoder(inputCols=["cat_idx"], outputCols=["cat_ohe"])
assembler = VectorAssembler(inputCols=["x1", "x2", "cat_ohe"], outputCol="features")
# The model
lr = LogisticRegression(featuresCol="features", labelCol="label")
# One pipeline, fit once
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
model = pipeline.fit(train)
# Score the held-out test set
predictions = model.transform(test)
auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC").evaluate(predictions)
print("AUC:", round(auc, 4))
Run it. You should see an AUC close to 1.0, because the label is an easy function of the features. On real data you will fight for every point of AUC, but the shape of the code does not change.
Notice what you did not do: you did not convert the DataFrame to pandas, and you did not loop over rows. The encoders, the assembler, and the model all ran as Spark stages across the cluster. Swap spark.range(2000) for a fifty million row table and the same code still runs.
Try it yourself
- Change the label rule and watch how the AUC moves.
- Add a second category column, and extend the
StringIndexerandOneHotEncoderstages to cover it. - Print
model.stagesand look at what each fitted stage is.
Key questions you can now answer
- What is the difference between a Transformer and an Estimator in MLlib?
- How does a Pipeline stop training and scoring from preparing features differently?
- Why do you one-hot encode a category instead of feeding its string index straight to the model?
- What does
VectorAssemblerproduce, and why does every MLlib model need it? - Why can this pipeline train on data that would not fit in scikit-learn?
Previous: Module 5.3: Kafka and Structured Streaming | Next: Module 6.1: Supervised Learning at Scale