Building a Predictive Maintenance Model with Python and Scikit-learn
In modern industrial and software operations, the shift from reactive ("fix it when it breaks") and preventive ("fix it every $N$ hours") maintenance to predictive maintenance (PdM) represents a significant competitive advantage. PdM leverages data analysis and machine learning to detect anomalies in operation and predict defects or failures before they occur.
For a Chief Technology Officer (CTO) or software engineer, implementing a PdM strategy is not just a data science exercise; it is a complex systems engineering challenge. It involves building a robust, scalable data pipeline, selecting and validating appropriate models, and integrating model inference into operational workflows.
This article provides a practical, code-first guide to building a PdM classification model using Python, Pandas, and Scikit-learn. We will focus on the engineering decisions, data transformations, and architectural patterns required to move from raw sensor data to a deployable model that can predict a failure event.
Product Engineering Services
Work with our in-house Project Managers, Software Engineers and QA Testers to build your new custom software product or to support your current workflow, following Agile, DevOps and Lean methodologies.
The PdM System Architecture
Before writing any ML code, we must design the data flow. A production-grade PdM system is an MLOps challenge that typically involves several components:
- Data Ingestion: Raw data (e.g., vibration, temperature, pressure, error logs) is collected from assets (e.g., IoT sensors, application logs). This data is often streamed via protocols like MQTT or Kafka.
- Data Storage & ETL: Data lands in a data lake (e.g., S3, ADLS) or a time-series database (e.g., InfluxDB). A processing layer (e.g., Spark, Dask, or Pandas for smaller datasets) cleans, aggregates, and merges sensor data with maintenance logs and failure records.
- Feature Engineering: This is the most critical step. Raw time-series data is transformed into meaningful features (e.g., rolling averages, standard deviations, lag features) that capture the "state" of an asset.
- Model Training & Registry: A training pipeline ingests features, trains one or more candidate models, and evaluates them. The finalized model (e.g., a serialized
.pklfile) and its metadata are stored in a model registry (e.g., MLflow, Vertex AI Registry). - Inference & Deployment:
- Batch Prediction: A scheduled job (e.g., Airflow, Kubernetes CronJob) runs daily, scoring all assets and generating a "risk list" for maintenance teams.
- Real-time Inference: An API endpoint (e.g., FastAPI, Flask) serves the model, allowing real-time systems to check an asset's health on demand.
For this guide, we will focus on the core components: Feature Engineering, Model Training, and Inference, using a simulated dataset.
Data Preparation and Labeling
Our goal is to predict failure. This requires two primary data sources:
- Time-series sensor data: Continuous readings from the asset.
- Failure logs: A discrete list of failure events and their timestamps.
The most important engineering decision is defining the label. We are not predicting failure at the exact moment it happens; we are predicting it in advance. We must define a prediction window (e.g., "Will this asset fail within the next 24 hours?").
Let's assume we have sensors.csv and failures.csv. We'll use Pandas to merge and label our data.
import pandas as pd
# Load sample data
# sensors.csv: [timestamp, asset_id, sensor_1, sensor_2, sensor_3]
# failures.csv: [timestamp, asset_id, failure_type]
try:
sensors = pd.read_csv('sensors.csv', parse_dates=['timestamp'])
failures = pd.read_csv('failures.csv', parse_dates=['timestamp'])
except FileNotFoundError:
print("Sample data files not found. Using placeholder dataframes.")
# Create placeholder data for demonstration
sensors = pd.DataFrame({
'timestamp': pd.to_datetime(pd.date_range(start='2023-01-01', periods=1000, freq='H')),
'asset_id': 1,
'sensor_1': [100 + i/100 + (i//800)*50 for i in range(1000)],
'sensor_2': [50 - i/200 + (i//800)*20 for i in range(1000)]
})
failures = pd.DataFrame({
'timestamp': pd.to_datetime(['2023-02-12T15:00:00']),
'asset_id': 1,
'failure_type': 'Component B'
})
# --- Label Engineering ---
# Define our prediction window: e.g., predict failure 24 hours in advance
prediction_window = pd.Timedelta('24 hours')
# Initialize labels to 0 (No Failure)
sensors['failure_imminent'] = 0
# Iterate over each failure event to create labels
# This is an O(n*m) operation, can be slow.
# For large datasets, use window functions or merge_asof.
for _, fail_row in failures.iterrows():
asset = fail_row['asset_id']
fail_time = fail_row['timestamp']
# Define the start of the "at-risk" window
window_start = fail_time - prediction_window
# Find all sensor readings for that asset within the window
mask = (
(sensors['asset_id'] == asset) &
(sensors['timestamp'] >= window_start) &
(sensors['timestamp'] < fail_time)
)
sensors.loc[mask, 'failure_imminent'] = 1
print(f"Total data points: {len(sensors)}")
print(f"Imminent failure points (Label=1): {sensors['failure_imminent'].sum()}")
print(sensors.tail())
This labeling strategy transforms the problem into a standard binary classification task. Note the high class imbalance; we will have many more 0 (healthy) samples than 1 (failing) samples.
Feature Engineering for Time-Series Data
A model rarely learns from raw sensor values alone. We must create features that describe the behavior of the sensors over time. Rolling window statistics are the most effective features for PdM.
Product Engineering Services
Work with our in-house Project Managers, Software Engineers and QA Testers to build your new custom software product or to support your current workflow, following Agile, DevOps and Lean methodologies.
We'll calculate the mean, standard deviation, and max value for each sensor over different time windows (e.g., 6 hours, 12 hours).
import pandas as pd
# Assuming 'sensors' DataFrame from the previous step
# Set timestamp as index for easier time-based operations
sensors = sensors.set_index('timestamp').sort_index()
# Define sensor columns and window sizes (in hours)
sensor_cols = ['sensor_1', 'sensor_2']
window_sizes = ['6H', '12H', '24H']
# Group by asset to prevent windows from crossing over different assets
grouped = sensors.groupby('asset_id')
feature_dfs = [sensors] # Start with the original data
for window in window_sizes:
print(f"Calculating features for window: {window}")
# Calculate rolling mean
rolling_mean = grouped[sensor_cols].rolling(window=window).mean()
rolling_mean = rolling_mean.rename(columns={col: f'{col}_mean_{window}' for col in sensor_cols})
# Calculate rolling standard deviation
rolling_std = grouped[sensor_cols].rolling(window=window).std()
rolling_std = rolling_std.rename(columns={col: f'{col}_std_{window}' for col in sensor_cols})
# Calculate rolling max
rolling_max = grouped[sensor_cols].rolling(window=window).max()
rolling_max = rolling_max.rename(columns={col: f'{col}_max_{window}' for col in sensor_cols})
feature_dfs.extend([rolling_mean, rolling_std, rolling_max])
# Combine all features
# We must reset index to align properly before concatenation
aligned_dfs = [df.reset_index() for df in feature_dfs]
features_df = pd.concat(aligned_dfs, axis=1)
# Remove duplicate columns ('timestamp', 'asset_id')
features_df = features_df.loc[:, ~features_df.columns.duplicated()]
# Drop rows with NaN values generated by the rolling windows
features_df = features_df.dropna().reset_index(drop=True)
print("Feature-engineered DataFrame shape:", features_df.shape)
print(features_df.columns)
Our DataFrame now contains not just the current sensor reading, but also its recent history, providing rich context for the model.
Model Selection, Training, and Evaluation
For tabular, structured data like this, complex Deep Learning models (e.g., LSTMs) are often outperformed by tree-based ensembles like Random Forest or Gradient Boosting (XGBoost, LightGBM). These models are:
- Highly performant on tabular data.
- Robust to outliers and unscaled features (though scaling is still good practice).
- Interpretable (we can extract feature importances).
Time-Series Aware Splitting
We cannot randomly shuffle time-series data for training and testing. This would cause data leakage, as the model would be trained on data from the future and tested on data from the past. We must split our data chronologically.
Handling Class Imbalance
Since failures are rare, our dataset is highly imbalanced. We can handle this using:
- Resampling: Oversampling the minority class (e.g., SMOTE) or undersampling the majority class.
- Model-level weighting: Using the
class_weightparameter in Scikit-learn models to penalize misclassifications of the minority class more heavily.
We will use the class_weight='balanced' parameter, which is a simple and effective approach.
Training Pipeline
We'll use Scikit-learn's Pipeline object to chain preprocessing (scaling) and the model.
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix, PrecisionRecallDisplay
import matplotlib.pyplot as plt
# Define features (X) and target (y)
target = 'failure_imminent'
# Exclude non-feature columns
features = [col for col in features_df.columns if col not in [target, 'asset_id', 'timestamp']]
X = features_df[features]
y = features_df[target]
# --- Time-Series Split ---
# We split based on time, not randomly.
# Let's use the first 80% of data for training, 20% for testing.
split_index = int(len(X) * 0.8)
X_train, X_test = X.iloc[:split_index], X.iloc[split_index:]
y_train, y_test = y.iloc[:split_index], y.iloc[split_index:]
print(f"Train shapes: X={X_train.shape}, y={y_train.shape}")
print(f"Test shapes: X={X_test.shape}, y={y_test.shape}")
print(f"Test set failure rate: {y_test.mean():.2%}")
# --- Build the Pipeline ---
# 1. StandardScaler: Scales features
# 2. RandomForestClassifier: Our model
# class_weight='balanced' is CRITICAL for imbalanced data.
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier(
n_estimators=100,
random_state=42,
class_weight='balanced',
n_jobs=-1
))
])
# --- Train the Model ---
print("Training the model...")
pipeline.fit(X_train, y_train)
# --- Evaluate the Model ---
print("Evaluating the model...")
y_pred = pipeline.predict(X_test)
# --- Performance Metrics ---
# For PdM, accuracy is useless. We care about Precision and Recall.
# Precision: Of all "failure" predictions, how many were correct? (Minimizes false positives / unnecessary maintenance)
# Recall: Of all actual failures, how many did we catch? (Minimizes false negatives / catastrophic failures)
# We MUST optimize for HIGH RECALL.
print("\n--- Classification Report ---")
print(classification_report(y_test, y_pred))
print("\n--- Confusion Matrix ---")
# [[True Negative, False Positive],
# [False Negative, True Positive]]
cm = confusion_matrix(y_test, y_pred)
print(cm)
# Plot Precision-Recall Curve
fig, ax = plt.subplots(figsize=(8, 6))
PrecisionRecallDisplay.from_estimator(pipeline, X_test, y_test, ax=ax)
ax.set_title("Precision-Recall Curve")
plt.show()
#
# --- Feature Importance ---
# Get feature importances from the model in the pipeline
importances = pipeline.named_steps['model'].feature_importances_
feature_importance_df = pd.DataFrame({
'feature': features,
'importance': importances
}).sort_values(by='importance', ascending=False)
print("\n--- Top 10 Features ---")
print(feature_importance_df.head(10))
The Classification Report and Confusion Matrix are your ground truth. Your goal is to maximize the True Positive count and minimize the False Negative count (missed failures). You will likely have to accept some False Positives (unnecessary maintenance) as a trade-off.
Deployment and Operationalization
A trained Pipeline object encapsulates your entire logic (scaling + model). It can be easily serialized and deployed.
Model Serialization
We use joblib to save our trained pipeline object to disk.
import joblib
# Save the entire pipeline
model_filename = 'pdm_model_pipeline.joblib'
joblib.dump(pipeline, model_filename)
print(f"Model saved to {model_filename}")
Creating a Prediction API (FastAPI)
This serialized model can be loaded into a lightweight web server to create a prediction API. FastAPI is an excellent choice due to its high performance and automatic data validation with Pydantic.
File: main.py
import joblib
import pandas as pd
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
# Define the input data structure
# This MUST match the features the model was trained on
class SensorFeatures(BaseModel):
sensor_1: float
sensor_2: float
sensor_1_mean_6H: float
sensor_1_std_6H: float
sensor_1_max_6H: float
sensor_1_mean_12H: float
# ... (add all other 18 features)
sensor_2_max_24H: float
# Create the FastAPI app
app = FastAPI(
title="Predictive Maintenance API",
description="API for predicting imminent asset failure"
)
# Load the model ONCE at startup
try:
pipeline = joblib.load('pdm_model_pipeline.joblib')
print("Model loaded successfully.")
except FileNotFoundError:
print("Model file not found. API will not work.")
pipeline = None
@app.on_event("startup")
async def load_model():
global pipeline
if pipeline is None:
try:
pipeline = joblib.load('pdm_model_pipeline.joblib')
print("Model loaded successfully.")
except FileNotFoundError:
print("ERROR: Model file 'pdm_model_pipeline.joblib' not found.")
@app.post("/predict")
async def predict(features: SensorFeatures):
"""
Predicts the probability of imminent failure based on sensor features.
Receives a single observation of features and returns a prediction.
"""
if pipeline is None:
return {"error": "Model not loaded"}, 500
# Convert Pydantic model to DataFrame
# The model expects a 2D array (or DataFrame)
data = pd.DataFrame([features.dict()])
# Ensure column order matches training
# (A more robust API would fetch feature names from the model object)
try:
# Get class probabilities
probabilities = pipeline.predict_proba(data)
# Probability of class 1 (failure)
failure_probability = probabilities[0][1]
# Get class prediction
prediction = int(pipeline.predict(data)[0]) # 0 or 1
return {
"prediction": prediction,
"failure_probability": failure_probability
}
except Exception as e:
return {"error": str(e)}, 400
To run this API: uvicorn main:app --reload
This API endpoint provides an operational mechanism to check an asset's health. A monitoring system can now send its feature-engineered data to POST /predict and receive a real-time risk score, triggering alerts or work orders.
Product Engineering Services
Work with our in-house Project Managers, Software Engineers and QA Testers to build your new custom software product or to support your current workflow, following Agile, DevOps and Lean methodologies.
Conclusion
We have successfully engineered a complete predictive maintenance model, from raw data to a deployable API.
For CTOs and engineering leads, the key takeaways are:
- PdM is a Data Engineering Problem: Success is less about complex model algorithms and more about high-quality data ingestion, robust labeling strategies, and meaningful feature engineering.
- Start with Simple, Interpretable Models: A
RandomForestClassifierwithclass_weight='balanced'and well-engineered features is a powerful, production-ready baseline that is far more interpretable than a neural network. - Optimize for the Right Metric: Do not use "accuracy." Focus on Recall and the Precision-Recall trade-off, balancing the cost of a missed failure (False Negative) against the cost of unnecessary maintenance (False Positive).
- Design for Deployment: Build your model within a Scikit-learn
Pipelineto encapsulate preprocessing. This makes serialization and deployment via a simple API (like FastAPI) trivial, connecting your model to the rest of your operational software stack.