MLOps para modelos de predicción de abandono

MLOps para modelos de predicción de abandono

En cualquier negocio basado en suscripciones o con ingresos recurrentes, la pérdida de clientes es la principal causa de estancamiento. Adquirir un nuevo cliente es mucho más costoso que retener a uno existente. Si bien los equipos de análisis de negocios pueden reportar la pérdida de clientes histórica, la, lo que facilita la intervención proactiva.

Esto no es un ejercicio para crear un cuaderno de ciencia de datos. Un modelo de predicción de abandono de alta calidad es un sistema de software complejo y en constante evolución que requiere una arquitectura sólida, operaciones MLOps disciplinadas y una estrategia de implementación clara.

Este artículo proporciona un plan técnico y completo para la construcción, implementación y mantenimiento de un modelo de predicción de abandono. Nos centraremos en las decisiones arquitectónicas, la implementación de la canalización y los desafíos operativos que enfrentará, más allá de la teoría del modelo.

El plano arquitectónico

La falla más común es un modelo que funciona bien en un entorno Jupyter, pero no puede integrarse en el negocio. Un sistema de producción debe ser fiable, auditable y escalable.

Su arquitectura debe separarel procesamiento de datos, el entrenamiento de modelos, y la inferencia de modelos.

Servicios de Ingeniería de Productos

Colabore con nuestros gestores de proyectos internos, ingenieros de software y probadores de calidad para desarrollar su nuevo producto de software personalizado o para apoyar su flujo de trabajo actual, siguiendo metodologías Agile, DevOps y Lean.

Build with 4Geeks

Un plano arquitectónico bien desarrollado se ve así:

  1. Ingesta y transformación de datos: Los datos brutos (registros de aplicaciones, eventos de facturación, tickets de soporte) se almacenan en un lago de datos (p. ej., S3, GCS) o en un almacén (p. ej., Snowflake, BigQuery, Redshift).
  2. Ingeniería de características: Un trabajo programado (utilizando dbt, Spark o un orquestador como Airflow/Prefect) se ejecuta diariamente para transformar los datos brutos en un almacén de características o en una simple "tabla de entrada de modelo". Esta tabla es la única fuente de información para el entrenamiento y la inferencia.
  3. Pila de entrenamiento: Un trabajo separado y programado (p. ej., un DAG semanal de Airflow) recupera los datos de entrenamiento del almacén de características, realiza la selección del modelo y entrena el candidato de modelo final.
  4. Registro de modelos: El modelo entrenado, sus métricas de rendimiento y sus artefactos serializados se versionan y registran en un registro de modelos (p. ej., MLflow, Vertex AI Registry, SageMaker Model Registry). Esto es crucial para la gobernanza y el deshacer.
  5. Servicio de inferencia:
    • Lote (Offline): Un trabajo diario carga el modelo "de producción" del registro, calcula las probabilidades de abandono para todos los clientes activos y escribe estas probabilidades en una base de datos para el equipo de Customer Success.
    • Tiempo real (Online): Una API contenedorizada (p. ej., FastAPI, Flask) carga el modelo y expone un /predict endpoint. Esto permite que otros servicios obtengan predicciones instantáneas (p. ej., "¿Deberíamos mostrar esta oferta de descuento a este usuario ahora mismo?").
  6. Monitoreo: Los paneles y las alertas supervisan el desplazamiento de datos (¿están cambiando las entradas?) y el desplazamiento del modelo (¿está disminuyendo el rendimiento?).

Fase 1: Ingeniería de características y la variable objetivo

Esta es la fase más crítica. "Basura entra, basura sale". Un modelo simple con excelentes características siempre superará a un modelo complejo con características deficientes.

Definir la Variable Objetivo (El "Evento de Abandono")

Primero, obtenga una definición precisa y sin ambigüedades de "churn" de la empresa.

  • ¿Es una cancelación activa?
  • ¿Es una incumplimiento de renovación de un contrato?
  • ¿Es una inactividad prolongada?

A continuación, defina su ventana de predicción. Un objetivo común y muy práctico es:

target = 1 si el cliente abandonó el servicio dentro de los 30 días posteriores a la fecha en que se calcularon las características.

target = 0 de lo contrario.

Evitar fugas de datos

El error cardinal en el modelado de series temporales esla filtración de datos—utilizar información de tus datos de entrenamiento que no habría estado disponible en el momento de la predicción.

Ejemplo: Calculando avg_session_length durante un período de 90 días.

  • Incorrect (Referencia a un momento específico): Para un usuario el 1 de junio, utiliza datos del 1 de junio al 30 de agosto. Estás mirando hacia el futuro.
  • Correcto (Referencia a un momento específico): Para un usuario el 1 de junio, solo utiliza datos del 1 de marzo al 31 de mayo.

Su proceso de ingeniería de características debe garantizar rigurosamente la exactitud en el momento.

Ejemplos de funciones concretas

Aquí se presentan las categorías de funciones comunes. Su consulta para crear esta "tabla de entrada de modelo" será la pieza de código SQL o Spark más compleja del proyecto.

Bene, posso aiutarti a comprendere meglio questa query SQL. Ecco una spiegazione passo dopo passo: **Struttura della Query** La query è una query `SELECT` che estrae dati da una tabella (non specificata nell'esempio, ma presumiamo esista). Include clausole `WHERE` per filtrare i risultati, e funzioni aggregate come `COUNT`. **Spiegazione Dettagliata** 1. **`SELECT` Clause:** * `COUNT(DISTINCT column_name)`: Questa parte della query conta il numero di valori *distinti* (unici) nella colonna specificata. "DISTINCT" è cruciale perché impedisce la contazione dello stesso valore più volte. * `column_name`: Questo è il nome della colonna su cui si desidera effettuare il conteggio. 2. **`FROM` Clause:** * `[nome_tabella]`: Indica la tabella da cui estrarre i dati. L'esempio non specifica il nome, ma è fondamentale per il funzionamento della query. 3. **`WHERE` Clause:** * `condition`: Questa clausola filtra le righe della tabella prima di eseguire il conteggio. Le condizioni nella clausola `WHERE` definiscono i criteri per l'inclusione di righe nel conteggio. **Esempio di Interpretazione (ipotetico)** Supponiamo che la tabella si chiami `customers` e abbia le seguenti colonne: * `customer_id` (INT, chiave primaria) * `payment_method` (VARCHAR, es: "credit card", "PayPal", "bank transfer") * `is_annual_plan` (BOOLEAN, 1 se abbonato annualmente, 0 altrimenti) * `payment_status` (VARCHAR, es: "paid", "pending", "failed") Con questa tabella, una query simile potrebbe essere: ```sql SELECT COUNT(DISTINCT payment_method) FROM customers WHERE payment_status = 'paid' AND is_annual_plan = 1; ``` **Cosa fa questa query:** * Conta il numero di metodi di pagamento *distinti* (unici) per i clienti. * Filtra i risultati per includere solo le righe in cui: * `payment_status` è uguale a 'paid' (cioè, i pagamenti sono stati effettuati con successo). * `is_annual_plan` è uguale a 1 (cioè, i clienti sono abbonati al piano annuale). **In sintesi:** La query fornisce un conteggio dei metodi di pagamento unici utilizzati da clienti che hanno un abbonamento annuale e hanno effettuato pagamenti. **Come usare questa informazione** 1. **Comprendi la struttura della tua tabella:** Identifica quali sono le colonne nella tua tabella e cosa rappresentano. 2. **Adatta la query:** Modifica i nomi delle colonne e i valori nella clausola `WHERE` per adattarli alla struttura della tua tabella e alle specifiche della tua query. 3. **Interpreta i risultati:** Il risultato della query ti dirà il numero di metodi di pagamento distinti utilizzati dai clienti che soddisfano i criteri specificati nella clausola `WHERE`. Questo può essere utile per diverse analisi, come: * Capire quali sono i metodi di pagamento più popolari tra gli abbonati annuali. * Confrontare l'uso di diversi metodi di pagamento tra diversi segmenti di clienti. Spero che questa spiegazione dettagliata ti sia utile! Se hai un esempio specifico di tabella e di cosa vuoi ottenere, posso fornirti una query più precisa.

Fase 2: Canal de entrenamiento del modelo

Para datos estructurados y tabulares, los modelos complejos de aprendizaje profundo rara vez son la mejor opción. Árboles de decisión potenciados por gradiente (GBDT) son el algoritmo dominante para esta tarea, con XGBoost y LightGBM que representan el estado del arte. Una regresión logística más simple es una excelente opción de referencia, especialmente si el negocio exige una alta interpretabilidad.es una excelente base, especialmente si el negocio exige una alta interpretabilidad.

El Pipeline de Scikit-learn

No escribas código de preprocesamiento independiente. Aplica toda tu lógica de preprocesamiento y modelado en un objeto Pipeline. Esto evita la desalineación de datos entre el entrenamiento y la inferencia, ya que las mismas transformaciones (imputación, escalado) se guardan con el modelo.

Aquí hay un ejemplo listo para la producción que utiliza scikit-learn's Pipeline y ColumnTransformer.

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from xgboost import XGBClassifier

# --- 1. Define feature types ---
# Assume X_train is a pandas DataFrame loaded from your feature store
numeric_features = ['days_since_last_login', 'session_length_avg_90d', 'support_tickets_opened_30d']
categorical_features = ['plan_type', 'user_region']

# --- 2. Create preprocessing transformers ---
# Pipeline for numeric features: Impute missing values with the median, then scale
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# Pipeline for categorical features: Impute missing with a constant, then one-hot encode
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

# --- 3. Combine transformers with ColumnTransformer ---
# This applies the correct transformer to the correct column
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough' # Pass through any columns not specified
)

# --- 4. Handle Imbalanced Data ---
# Churn rates are low (e.g., 2%). Simply using accuracy is useless.
# We must either oversample (e.g., SMOTE) or use class weighting.
# XGBoost's `scale_pos_weight` is highly effective and computationally cheaper than SMOTE.
# scale_pos_weight = count(negative_class) / count(positive_class)
y_train = # ... your target variable (0s and 1s)
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()

# --- 5. Create the full model pipeline ---
model = XGBClassifier(
    objective='binary:logistic',
    eval_metric='aucpr',  # Area Under Precision-Recall Curve: The best metric for imbalanced data
    scale_pos_weight=scale_pos_weight,
    n_estimators=200,
    learning_rate=0.05,
    use_label_encoder=False
)

# This final 'churn_pipeline' object is what you will save
churn_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('model', model)
])

# --- 6. Train ---
# X_train, y_train are your features and target from a point-in-time snapshot
churn_pipeline.fit(X_train, y_train)

# --- 7. Evaluate ---
# X_test, y_test must be from a *later* time period than training data
from sklearn.metrics import classification_report, precision_recall_curve, auc

y_probs = churn_pipeline.predict_proba(X_test)[:, 1]
precision, recall, _ = precision_recall_curve(y_test, y_probs)
print(f"Model AUPRC: {auc(recall, precision)}")
print(classification_report(y_test, churn_pipeline.predict(X_test)))

Validación del modelo: División basada en el tiempo

No pueden utilizar un train_test_split estándar.

Debe validar en un conjunto de datos de prueba independiente del futuro..

  • Estrategia: Capacitar de enero a marzo, validar en abril. O, capacitar en 2023, validar en el primer trimestre de 2024.
  • Métricas: Centrarse en Precisión, Tasa de recuperación, Puntuación F1, y Área bajo la curva de precisión-recuperación (AUPRC). La precisión es irrelevante. La empresa quiere saber: "De los 100 usuarios que predijiste que abandonarían (Precisión), ¿cuántos realmente lo hicieron? (Recuperación)".

Fase 3: Implementación y MLOps

Un artefacto de modelo entrenado (.pkl o .joblib) es inútil por sí solo. Debe integrarse en un sistema para el entrenamiento, el control de versiones y la ejecución..pklo.joblib) por sí solo es inútil. Debe integrarse en un sistema para la formación, el control de versiones y la prestación de servicios.

Servicios de Ingeniería de Productos

Trabaje con nuestros gestores de proyectos, ingenieros de software y testers de calidad internos para desarrollar su nuevo producto de software personalizado o para apoyar su flujo de trabajo actual, siguiendo metodologías Agile, DevOps y Lean.

Build with 4Geeks

Registro de modelos con MLflow

Un registro de modelos es tu "Git para modelos". MLflow es el estándar de código abierto.

En su script de entrenamiento, debe registrar el modelo y sus metadatos:

import mlflow
import mlflow.sklearn
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score

# Set up MLflow tracking (can be a local folder or a remote server)
mlflow.set_tracking_uri("http://your-mlflow-server:5000")
mlflow.set_experiment("churn_prediction_v2")

# ... (your training code from above) ...

with mlflow.start_run() as run:
    # --- 1. Log parameters ---
    params = {
        "model_type": "XGBClassifier",
        "n_estimators": 200,
        "learning_rate": 0.05,
        "scale_pos_weight": scale_pos_weight
    }
    mlflow.log_params(params)

    # --- 2. Train the pipeline ---
    churn_pipeline.fit(X_train, y_train)

    # --- 3. Log metrics ---
    y_pred = churn_pipeline.predict(X_test)
    y_probs = churn_pipeline.predict_proba(X_test)[:, 1]
    
    metrics = {
        "f1_score": f1_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred),
        "auprc": auc(recall, precision) # From precision_recall_curve
    }
    mlflow.log_metrics(metrics)

    # --- 4. Log the model artifact ---
    # This logs the *entire* Scikit-learn pipeline
    mlflow.sklearn.log_model(
        sk_model=churn_pipeline,
        artifact_path="model",
        registered_model_name="production_churn_model" # Registers the model
    )
    
    print(f"Run ID: {run.info.run_id} logged to MLflow.")

Desde la interfaz de MLflow, ahora puede ver todas las ejecuciones de experimentos y, crucialmente, promocionar una versión de modelo de "Staging" a "Producción."

Inferencia: API en tiempo real con FastAPI

Para la inferencia en tiempo real, un framework web de Python ligero es ideal. FastAPI es el estándar moderno debido a su velocidad y a la validación automática de datos con Pydantic.

1. Crear su archivo de API (main.py):

from fastapi import FastAPI
from pydantic import BaseModel
import pandas as pd
import mlflow
import os

# --- 1. Define the input data schema using Pydantic ---
# This provides automatic data validation
class UserFeatures(BaseModel):
    days_since_last_login: int
    session_length_avg_90d: float
    support_tickets_opened_30d: int
    plan_type: str
    user_region: str
    
    # Example for Pydantic v2
    class Config:
        extra = 'allow' # Allows other features not explicitly defined

# --- 2. Load the production model from MLflow ---
# Set the tracking URI
os.environ["MLFLOW_TRACKING_URI"] = "http://your-mlflow-server:5000"

# Load the model version currently tagged as "Production"
model_uri = "models:/production_churn_model/Production"
model = mlflow.pyfunc.load_model(model_uri)

app = FastAPI(title="Churn Prediction API")

@app.post("/predict")
def predict_churn(features: UserFeatures):
    """
    Takes user features as JSON and returns a churn probability.
    """
    # 1. Convert Pydantic model to pandas DataFrame
    # The Scikit-learn pipeline expects a DataFrame
    input_df = pd.DataFrame([features.model_dump()])
    
    # 2. Make prediction
    # The loaded model is a pyfunc wrapper, which matches mlflow.pyfunc.predict signature
    # This automatically handles preprocessing and prediction
    try:
        probability = model.predict(input_df)
        
        # The raw output from an XGB pipeline might be an array
        churn_probability = float(probability[0])
        
        return {
            "user_id": features.user_id, # Assuming user_id is passed in
            "churn_probability": churn_probability,
            "model_version": model.metadata.run_id # For traceability
        }
    except Exception as e:
        return {"error": str(e)}, 500

@app.get("/health")
def health_check():
    return {"status": "ok"}

2. Dockerizar y desplegar:

Esta aplicación de FastAPI se puede contenerizar con un simple Dockerfile y desplegar en cualquier plataforma moderna (Kubernetes, AWS ECS, Google Cloud Run) para una API escalable y de baja latencia.

Fase 4: Monitoreo y Explicabilidad

Tu trabajo no ha terminado al momento de la implementación. Los modelos se degradan.

Desviación de datos frente a desviación del modelo

Debe controlar dos tipos de desviación:

  1. Desplazamiento de datos (Desplazamiento de entrada): Las propiedades estadísticas de tus características de entrada cambian.
    • Ejemplo: Una nueva campaña de marketing atrae a usuarios de un nuevo país. La característica user_region ahora tiene una distribución que tu modelo nunca ha visto.
    • Cómo monitorizar: Utiliza una biblioteca como evidently.ai o whylogs. Compara la distribución estadística (por ejemplo, media, mediana, cardinalidad) de los datos de inferencia entrantes con el conjunto de entrenamiento de referencia. Activa una alerta si el Índice de Estabilidad de la Población (PSI) o la prueba Kolmogorov-Smirnov (KS) tiene un valor p que supera un umbral.
  2. Desplazamiento del modelo (Desplazamiento del concepto): La relación entre las características y la variable objetivo cambia.
    • Ejemplo: Un competidor lanza una nueva función. Ahora, los usuarios de alta actividad (que anteriormente eran "seguros") empiezan a abandonar para utilizar esa función. Tu modelo, entrenado con datos antiguos, ya no es correcto.
    • Cómo monitorizar: Debes tener un bucle de retroalimentación. Registra todas las predicciones. Cuando el evento real de abandono (o la falta de él) se conozca 30 días después, compáralo con tu predicción. Realiza un seguimiento de tu métrica AUPRC a lo largo del tiempo. Si disminuye en más del 10%, activa una alerta para volver a entrenar.

Explicabilidad (XAI)

La empresa no confiará en una "caja negra". Debes poder responder a la siguiente pregunta: "¿Por qué se predice que este usuario abandonará el servicio?

Utilice SHAP (SHapley Additive exPlanations). Es una biblioteca que no depende del modelo y que asigna un valor de "impacto" a cada característica para una predicción específica.

import shap

# ... (Load your 'churn_pipeline' and 'X_test') ...

# 1. Get the 'model' part of your pipeline
model = churn_pipeline.named_steps['model']

# 2. Get the *processed* data from the 'preprocessor' part
processed_X_test = pd.DataFrame(
    churn_pipeline.named_steps['preprocessor'].transform(X_test),
    columns=churn_pipeline.named_steps['preprocessor'].get_feature_names_out()
)

# 3. Create a SHAP explainer
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(processed_X_test)

# 4. Explain a single prediction (e.g., for the first user in the test set)
# This force plot shows which features pushed the prediction
# from the base value (average) to the final output
shap.initjs()
shap.force_plot(explainer.expected_value, shap_values[0,:], processed_X_test.iloc[0,:])

Este gráfico puede integrarse en sus paneles internos, proporcionando al equipo de "Customer Success" puntos de discusión concretos y prácticos (por ejemplo, "La puntuación de este usuario es alta porque el número de días desde el último inicio de sesión es de 45 y ha abierto recientemente 3 tickets de alta prioridad.").

Servicios de Ingeniería de Productos

Colabore con nuestros gestores de proyectos, ingenieros de software y probadores de calidad, para desarrollar su nuevo producto de software personalizado o para apoyar su flujo de trabajo actual, siguiendo metodologías Agile, DevOps y Lean.

Build with 4Geeks

Conclusión

Construir un modelo de abandono de clientes es una tarea esencial de ingeniería de IA. El valor no reside en un único .pkl archivo, sino en la creación de un sistema duradero y automatizado. El éxito se mide no por el AUPRC del modelo, sino por la capacidad de la empresa para utilizar sus resultados para tomar decisiones.

Al centrarse en una arquitectura sólida, una ingeniería de características rigurosa y operaciones continuas de MLOps, pasas de un proyecto de "ciencia de datos" reactivo a un sistema de ingeniería proactivo que genera valor y tiene un impacto directo en los resultados de la empresa.

Preguntas frecuentes

¿Cuál es la arquitectura recomendada para un sistema escalable de predicción de la pérdida de clientes?

Un sistema robusto de predicción de la pérdida de clientes requiere una arquitectura modular que separe el procesamiento de datos, el entrenamiento del modelo y la inferencia. Las implementaciones efectivas suelen utilizar un almacén de características como una única fuente de verdad para gestionar las transformaciones de datos, garantizando la consistencia entre los entornos de entrenamiento y producción. El flujo de trabajo debe incluir un trabajo de entrenamiento programado para actualizar el modelo con datos recientes y una capa de inferencia que soporte tanto procesamiento por lotes(para generar puntuaciones diarias de riesgo de pérdida para los equipos de atención al cliente) y APIs en tiempo real(para la intervención inmediata durante la interacción del usuario).

¿Cómo pueden los desarrolladores prevenir la fuga de datos al entrenar un modelo de abandono?

La fuga de datos a menudo ocurre cuando un modelo utiliza inadvertidamente información del futuro que no estaría disponible en el momento de la predicción. Para evitar esto, los ingenieros deben garantizar la corrección en el tiempo durante la ingeniería de características, calculando métricas estrictamente a partir de datos históricos anteriores a la ventana de predicción. Además, la validación siempre debe emplear división basada en el tiempo (p. ej., entrenar con datos de enero-marzo y validar con datos de abril) en lugar de un simple barajado aleatorio, lo que mezcla de forma inexacta eventos pasados y futuros y infla las métricas de rendimiento.

¿Por qué es esencial la monitorización del cambio en los datos y conceptos para los modelos de predicción de la pérdida de clientes?

Los modelos de aprendizaje automático se degradan con el tiempo a medida que cambian los comportamientos de los clientes y las condiciones del mercado. Cambio de datos ocurre cuando las propiedades estadísticas de las características de entrada cambian (por ejemplo, un nuevo grupo demográfico entra en la base de usuarios), mientras que Cambio de modelo(o cambio de concepto) se produce cuando la relación entre las características y la pérdida de clientes cambia (por ejemplo, los usuarios empiezan a abandonar debido a una nueva función de un competidor). La monitorización continua de métricas como AUPRC (Área bajo la curva de precisión-recall) y el establecimiento de bucles de retroalimentación para comparar las predicciones con los resultados reales permite a los equipos detectar estos cambios y activar la reentrenamiento antes de que el modelo quede obsoleto.