PySpark - Module 6.3: ML Pipelines in Production
ML Pipelines in Production
A model that lives in the notebook where you trained it is a prototype. Production means four more things: save the model, load it somewhere else, score new data with it, and keep an eye on it as the world changes. MLlib and MLflow cover all four, and none of it leaves Databricks.
Keep working on environment version 4, and reuse the Unity Catalog Volume from 6.1. On serverless, saving and loading models always goes through a Volume.
import os
cat = spark.sql("SELECT current_catalog()").first()[0]
sch = spark.sql("SELECT current_schema()").first()[0]
spark.sql(f"CREATE VOLUME IF NOT EXISTS {cat}.{sch}.ml_models")
volume = f"/Volumes/{cat}/{sch}/ml_models"
os.environ["SPARKML_TEMP_DFS_PATH"] = volume
Saving and loading a model
A fitted PipelineModel saves to the Volume as a folder. The whole pipeline goes with it: the indexers, the encoder, the assembler, and the trained model. Whoever loads it gets the exact same feature steps, so there is no way to score with different preparation than you trained with.
from pyspark.ml import PipelineModel
model.write().overwrite().save(f"{volume}/credit_model")
loaded = PipelineModel.load(f"{volume}/credit_model")
Batch inference
Batch inference is the everyday production job: load the saved model, point it at a fresh batch of data, and write the scores out. Because the model is a Transformer, scoring a million rows is the same call as scoring ten.
scored = loaded.transform(new_data)
scored.select("prediction", "probability").show(5)
The new batch only needs the same input columns the pipeline was trained on. The encoders and the assembler rebuild the features column on their own.
Tracking experiments with MLflow
Once you train more than one model, you need a record of what you tried and how it did. MLflow keeps that record. Each run stores the parameters you logged, the metrics you logged, and the model itself, and they show up in the Experiments tab next to your notebook.
import mlflow
with mlflow.start_run() as run:
mlflow.log_param("model", "logreg")
mlflow.log_metric("auc", round(auc, 4))
mlflow.spark.log_model(model, "model", dfs_tmpdir=volume)
The dfs_tmpdir=volume argument is the serverless detail: MLflow stages the Spark model through the Volume, the same place SparkML writes to. Without it, logging a Spark model on serverless fails.
Watching for drift
A model is trained on a snapshot of the world, and the world moves. Data drift is when the incoming data starts to look different from the training data: a feature’s average shifts, a new category appears, a range widens. The model still runs, but its predictions quietly get worse.
You do not need a special tool to start. Compare the new batch against the training data with the Spark aggregations you already know.
from pyspark.sql import functions as F
for col in ["x1", "x2"]:
t = train.select(F.mean(col).alias("mean"), F.stddev(col).alias("std")).first()
n = new_data.select(F.mean(col).alias("mean"), F.stddev(col).alias("std")).first()
print(f"{col}: train mean={t['mean']:.2f} std={t['std']:.2f} | new mean={n['mean']:.2f} std={n['std']:.2f}")
When the new numbers drift away from the training numbers, that is your signal to look closer, and often to retrain. The same idea scales up to proper monitoring: track these summaries over time, and alert when they move past a threshold.
Try it yourself
- Save a model, load it in a fresh cell, and confirm the loaded model scores the same rows the same way.
- Log two models to MLflow with different settings and compare their AUC in the Experiments tab.
- Shift
new_data(add a constant tox1) and watch the drift check pick it up.
Key questions you can now answer
- Why does saving the whole
PipelineModel, not just the final estimator, matter? - What is batch inference, and why is scoring a million rows the same code as scoring ten?
- What does MLflow record per run, and why does logging a Spark model on serverless need
dfs_tmpdir? - What is data drift, and how can you detect it with plain Spark aggregations?
- Where do saved models and staged artifacts have to live on Free Edition serverless, and why?
Previous: Module 6.2: Unsupervised Learning and Recommendation | Next: Module 7: Capstone Project