Diseñar tuberías de datos en tiempo real con Apache Kafka

Diseñar tuberías de datos en tiempo real con Apache Kafka

En las empresas modernas, la demanda de datos en tiempo real ya no es un lujo, sino una necesidad fundamental del negocio. Desde potenciar paneles de análisis en tiempo real y entrenar modelos de aprendizaje automático hasta habilitar microservicios basados en eventos, la capacidad de capturar, procesar y reaccionar a los datos a medida que se generan representa una ventaja competitiva significativa.

Apache Kafka se ha convertido en el estándar de código abierto por defecto para construir estas tuberías de transmisión de datos en tiempo real. Sin embargo, pasar de una configuración simple de "Hola Mundo" de Kafka a una arquitectura de producción, robusta y escalable, implica decisiones de diseño críticas. Una tubería mal configurada puede provocar la pérdida de datos, cuellos de botella en el procesamiento y fallos sistémicos en cascada.

Este artículo proporciona una guía técnica para arquitectos, directores de tecnología (CTOs) e ingenieros senior, explicando cómo diseñar e implementar una robusta canalización de Kafka. Nos centraremos en patrones arquitectónicos, compromisos de configuración y detalles de implementación prácticos, esenciales para sistemas a gran escala. Los principios discutidos aquí son fundamentales para los tipos de plataformas robustas utilizadas por organizaciones líderes, a menudo desarrolladas a través de servicios de consultoría especializados de ingeniería de datos para empresas de la lista Fortune 500. empresas. compañías.

Pilares Arquitectónicos Fundamentales: Diseño de Esquema y Tema

Antes de escribir una sola línea de código, la base de una plataforma escalable se basa en dos elementos: cómo se estructura los datos (esquema) y cómo se organizan (temas).

Servicios de Ingeniería de Productos

Trabaje con nuestros gestores de proyectos, ingenieros de software y probadores de calidad internos para desarrollar su nuevo producto de software personalizado o para apoyar su flujo de trabajo actual, siguiendo metodologías Agile, DevOps y Lean.

Build with 4Geeks

La importancia de un contrato de datos: Gestión de esquemas

Considerar los flujos de datos como "simplemente JSON" es un error arquitectónico común que conduce al caos en etapas posteriores. Una tubería robusta exige un contrato de datos estricto. Es aquí donde un Registro de Esquemas se vuelve indispensable.

Recomendamos encarecidamente utilizar Apache Avro junto con el Confluent Schema Registry.

  • ¿Por qué Avro?
    • Compacto: La serialización binaria es significativamente más pequeña que JSON/XML basado en texto.
    • Rápido: La serialización/deserialización es extremadamente eficiente, reduciendo la sobrecarga de CPU en productores y consumidores.
    • Evolución del esquema: Esta es la principal ventaja. Un registro de esquemas le permite aplicar reglas de compatibilidad (por ejemplo, Hacia atrás, Hacia adelante, COMPLETO), asegurando que las nuevas versiones de productores no afecten a los consumidores existentes.

Ejemplo de implementación: Esquema Avro

Un esquema Avro (.avsc) para un evento de interacción del usuario podría verse así:.avsc) para un evento de interacción con el usuario, podría ser algo así:

{
  "type": "record",
  "namespace": "com.mycompany.events",
  "name": "UserInteraction",
  "fields": [
    { "name": "user_id", "type": "string" },
    { "name": "event_type", "type": { "type": "enum", "name": "InteractionType", "symbols": ["CLICK", "VIEW", "PURCHASE"] } },
    { "name": "timestamp_ms", "type": "long", "logicalType": "timestamp-millis" },
    { "name": "page_url", "type": ["null", "string"], "default": null }
  ]
}

Decisión Arquitectónica: Siempre configure la compatibilidad de su esquema a ADVERSO. Esto significa que los nuevos esquemas pueden agregar nuevos campos (con valores predeterminados) o eliminar campos opcionales, pero no pueden eliminar campos obligatorios. Esto garantiza que los consumidores existentes, que aún no se han actualizado, no fallarán al leer nuevos mensajes.

Tema y Estrategia de Partición

Un tema no es solo un nombre; es una unidad de escalabilidad. El número de particiones en un tema determina la máximo nivel de paralelización de tu capa de consumo.

  • Nomenclatura de temas: Adopte una convención de nomenclatura clara y jerárquica, por ejemplo:service.domain.event(por ejemplo:payments.core.transaction_authorized).
  • Tamaño de las particiones: Esto es más arte que ciencia, pero un buen punto de partida es considerar:
    • Tasa de procesamiento objetivo: Si un tema necesita procesar 100.000 mensajes/segundo y una partición de consumidor puede procesar 5.000 mensajes/segundo, necesitará al menos 20 particiones.
    • Paralelismo de los consumidores: Si tiene un servicio de consumidor (un "Grupo de Consumidores") que planea escalar a 10 instancias, necesitará al menos 10 particiones. Puede tener más particiones que consumidores, pero no más consumidores (en un solo grupo) que particiones.
    • Clave: Si está utilizando claves para mensajes (por ejemplo, user_id) para garantizar el orden para ese usuario, todos los mensajes para esa clave aterrizarán en la misma partición. Esto puede crear "particiones con alta demanda" si su distribución de claves está sesgada.

Regla general: Comience con un número moderado de particiones (p. ej., 12, 24) y planifique para monitorear el rendimiento. Es fácil agregar particiones más tarde, pero es muy difícil reducirlas.muy difícil reducirlas.

Implementando Productores Resilientes

El trabajo de un productor es enviar datos de forma fiable y eficiente. "Enviar y olvidar" no es una opción en un sistema de producción.

Configuraciones Clave del Productor para la Fiabilidad

La configuración de su productor debe ajustarse para cumplir con sus requisitos específicos de durabilidad.

This HTML code snippet describes a scenario related to message queuing and idempotency, likely within a distributed system. Let's break it down: **Overall Structure:** * It consists of a table with three `` (table row) elements. * Each row contains three `` (table data) elements, each holding a piece of information. * The table likely represents a configuration or a description of a system component. **Row-by-Row Explanation:** * **Row 1:** * `Coupled with idempotence, this ensures the producer will retry "forever" (or until delivery.timeout.ms) to successfully write the message.` * This describes a critical characteristic: **Idempotency**. Idempotency means that applying an operation multiple times has the same effect as applying it once. This is crucial in distributed systems where messages might be delivered more than once due to network issues or failures. * The message emphasizes that the producer will keep retrying until either the message is successfully written (success) or the `delivery.timeout.ms` value is reached (failure). `delivery.timeout.ms` likely represents a time limit for the message to be delivered. * This is a robust design, preventing the system from getting stuck in an infinite retry loop if the message cannot be delivered. * **Row 2:** * `Allow for transient errors but fail faster if the cluster is unhealthy.` * This describes a more sophisticated error handling strategy. It acknowledges that transient errors (temporary issues like network hiccups) are expected and will be handled by retrying. * However, it also includes a "fail faster" mechanism. If the cluster (the collection of servers) is unhealthy (meaning there's a persistent problem, not just a temporary blip), the system will stop retrying and instead, declare a failure. This is important to prevent the system from wasting resources on retrying messages that will never be successfully delivered due to a fundamental problem. * **Row 3:** * This row is empty. This likely indicates a summary, conclusion, or a related statement. It's important to note that the full context is missing, but based on the preceding rows, it probably relates to the benefits of this design. **Key Concepts and Their Importance:** * **Message Queuing:** A messaging queue (like RabbitMQ, Kafka, or a cloud-based queue) is used for asynchronous communication between different parts of a system. It allows producers to send messages without needing to wait for immediate responses. * **Idempotency:** This is essential for reliability in distributed systems. If a message is delivered multiple times, it should only have the intended effect once. This prevents data corruption and ensures consistency. * **Idempotency Keys/Transaction IDs:** Often, producers generate a unique ID for each message. The message queue then uses this ID to track whether the message has already been processed. If the ID is already present, the queue knows it should not process the message again. * **`delivery.timeout.ms`:** This is a setting in the message queue configuration. It defines the maximum amount of time the queue will wait for a message to be successfully processed by a consumer. After this time, the message is typically considered failed and can be retried or discarded. * **"Fail Fast":** This is a common principle in software development. It means that if there's a problem, the system should detect it quickly and stop processing further requests or messages to prevent further errors. **In Summary:** The HTML code describes a robust design for a message queuing system that prioritizes reliability and fault tolerance. It uses idempotency to handle duplicate messages and a "fail fast" strategy to quickly detect and address cluster-level issues. This approach is crucial for building resilient and scalable distributed systems.

Servicios de Ingeniería de Productos

Colabore con nuestros gestores de proyectos, ingenieros de software y probadores de calidad para desarrollar su nuevo producto de software personalizado o para apoyar su flujo de trabajo actual, siguiendo metodologías Agile, DevOps y Lean.

Build with 4Geeks

Ejemplo de código de productor (Python con Registro de Esquemas)

Este ejemplo demuestra un productor robusto que serializa con Avro, utiliza una función de devolución de llamada para el manejo de errores, y está configurado para la idempotencia.

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.cimpl import KafkaException
import socket

# 1. Define Schema Registry and Avro Serializer
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

value_schema_str = """
{
  "type": "record", "name": "UserInteraction", ... (schema from above)
}
"""
avro_serializer = AvroSerializer(schema_registry_client, value_schema_str)

# 2. Configure Producer for Idempotence and Reliability
producer_conf = {
    'bootstrap.servers': 'kafka-broker-1:9092,kafka-broker-2:9092',
    'client.id': socket.gethostname(),
    'enable.idempotence': True,
    'acks': 'all',
    'retries': 5,
    'compression.type': 'snappy', # Always use compression
    'linger.ms': 10,              # Batch records for 10ms
    'batch.size': 32768,          # 32KB batch size
    'value.serializer': avro_serializer
}

producer = SerializingProducer(producer_conf)

# 3. Implement Delivery Callback for Error Handling
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result. """
    if err is not None:
        print(f"Message delivery failed for user {msg.key()}: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")

# 4. Produce messages
def produce_event(topic, key, value):
    try:
        # poll(0) is critical to process delivery callbacks
        producer.poll(0) 
        
        producer.produce(
            topic=topic,
            key=str(key), # Keying by user_id ensures ordering for that user
            value=value,
            on_delivery=delivery_report
        )
    except KafkaException as e:
        print(f"Kafka exception: {e}")
    except ValueError as e:
        print(f"Invalid input (serialization failed?): {e}")

# Example usage:
event_data = {
    "user_id": "u-123",
    "event_type": "CLICK",
    "timestamp_ms": 1678886400000,
    "page_url": "/products/abc"
}
produce_event('prod.web.user_interactions', event_data['user_id'], event_data)

# 5. Wait for all messages to be delivered
producer.flush() 

Implementación de Consumidores Escalables y Resistentes a Fallos

El principal desafío de un consumidor es procesar mensajes de manera eficientesin pérdida de datos y para gestionar eventos de clúster, comoreajustes.

El principio fundamental: Envío manual de compensaciones

La configuración más importante para un consumidor confiable es habilitar auto.commit=false.

Nunca confíes en el auto-commit. Este realiza los ajustes de forma automática en segundo plano a intervalos fijos, lo que significa que tu aplicación podría fallar después de procesar un mensaje pero antes de que se haya realizado el ajuste. Al reiniciarse, volverá a procesar ese mensaje (al menos una vez). Peor aún, podría fallar antes de procesar un mensaje pero después de que se haya realizado el ajuste, lo que podría provocar la pérdida de datos ..

El patrón correcto: Consumir -> Procesar -> Confirmar

  1. Consuma un conjunto de registros.
  2. Procesa los registros (p. ej., escribe en una base de datos, llama a una API).
  3. Guarda los offsets de los registros que se hayan procesado correctamente manualmente.

Ejemplo de Código para Consumidores (Python con Comprobaciones Manuales)

Este ejemplo demuestra el patrón de confirmación manual y la deserialización de Avro.

from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.cimpl import KafkaError, KafkaException

# 1. Define Schema Registry and Avro Deserializer
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# We can just fetch the schema by subject name, no need to hardcode
value_subject_name = 'prod.web.user_interactions-value'
avro_deserializer = AvroDeserializer(
    schema_registry_client,
    from_subject=value_subject_name 
)

# 2. Configure Consumer
consumer_conf = {
    'bootstrap.servers': 'kafka-broker-1:9092,kafka-broker-2:9092',
    'group.id': 'user-interaction-analytics-service',
    'auto.offset.reset': 'earliest', # Start from beginning if no offset
    'enable.auto.commit': False,     # CRITICAL: Manual commits
    'value.deserializer': avro_deserializer
}

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['prod.web.user_interactions'])

# 3. The Poll Loop
try:
    while True:
        # Poll for new messages (1.0s timeout)
        msg = consumer.poll(1.0)

        if msg is None:
            # No message received
            continue
        
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event - not an error
                print(f"Reached end of partition {msg.topic()} [{msg.partition()}]")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Message successfully consumed
            event_data = msg.value()
            print(f"Processing event for user {event_data['user_id']}...")
            
            # --- BEGIN BUSINESS LOGIC ---
            # e.g., write_to_data_warehouse(event_data)
            # This logic MUST be idempotent or handle duplicates,
            # as we are guaranteeing at-least-once processing.
            # --- END BUSINESS LOGIC ---

            # 4. Manually commit the offset after successful processing
            # We commit asynchronously for performance
            consumer.commit(asynchronous=True)

except KeyboardInterrupt:
    print("Shutting down consumer...")
finally:
    # 5. Cleanly close the consumer
    # This will commit final offsets and trigger a rebalance
    consumer.close()

Monitoreo operativo: Qué deben realizar los directores de tecnología

Una plataforma en funcionamiento no es una plataforma saludable. Sin supervisión, estarás sin rumbo. Concéntrate en estas métricas clave para tus paneles de control:

Servicios de Ingeniería de Productos

Trabaje con nuestros gestores de proyectos, ingenieros de software y probadores de calidad internos para crear su nuevo producto de software personalizado o para apoyar su flujo de trabajo actual, siguiendo metodologías Agile, DevOps y Lean.

Build with 4Geeks
  1. Retraso del Consumidor (La métrica #1):
    • ¿Qué es?: La diferencia (en número de mensajes) entre el último mensaje escrito en una partición y el último mensaje confirmado por un grupo de consumidores.
    • ¿Por qué es importante?: Este es su "acreimiento" de procesamiento. Si el retraso está creciendo constantemente, sus consumidores no pueden seguir el ritmo de sus productores. Esto indica la necesidad de escalar su grupo de consumidores (añadir más instancias) o optimizar su lógica de procesamiento.
  2. Broker: Particiones subreplicadas:
    • ¿Qué es?: Un recuento de particiones donde el número de réplicas en sincronía (ISRs) es menor que el factor de replicación configurado.
    • ¿Por qué es importante?: Cualquier valor distinto de cero es una alerta de alta prioridad. Significa que ha perdido la tolerancia a fallos para esa partición. Si el líder restante falla, experimentará pérdida de datos o una partición no disponible.
  3. Productor: Latencia y Tasa de Errores de la Solicitud:
    • ¿Qué es?: El tiempo que tarda un productor en recibir una confirmación del broker.
    • ¿Por qué es importante?: Los picos de latencia son una señal temprana de estrés en el lado del broker (p. ej., cuellos de botella en la E/S, alto uso de la CPU).
  4. Clúster: ISRs que se reducen/aumentan:
    • ¿Qué es?: La tasa a la que las réplicas son "eliminadas" del conjunto de ISRs (p. ej., debido a problemas de red, pausas de GC) y luego se vuelven a añadir.
    • ¿Por qué es importante?: Un ISR "flapping" indica inestabilidad en el clúster o problemas de red entre los brokers.

De Pipeline a Plataforma

Construir una tubería de datos con Apache Kafka es un importante proyecto de ingeniería. Al ir más allá de las configuraciones predeterminadas y centrarse en un diseño diseño basado en el esquemabasado en el esquema, productores idempotentes con acks=all, y , se establece una base de fiabilidad.

Esta plataforma no es solo una herramienta de integración; se convierte en un sistema nervioso central para su negocio. Es la plataforma ideal para el análisis en tiempo real, microservicios impulsados por eventos y el procesamiento de flujos complejos. Asegurarse de que la arquitectura sea correcta desde el principio es el primer paso crucial para transformar su organización en una empresa verdaderamente orientada a los datos.

Preguntas frecuentes

¿Para qué se utiliza Apache Kafka?

Apache Kafka es una plataforma de código abierto utilizada para construir tuberías de transmisión de datos en tiempo real. Permite a los sistemas capturar, procesar y reaccionar a los datos a medida que se generan. Esta capacidad es esencial para potenciar paneles de análisis en tiempo real, habilitar microservicios basados en eventos y entrenar modelos de aprendizaje automático con datos frescos.

¿Por qué es crucial un Registro de Esquemas al utilizar Kafka?

Un Registro de Esquemas es crucial porque establece un "contrato de datos" estricto para los flujos de datos, lo que evita que las aplicaciones posteriores fallen cuando cambian las estructuras de datos. Al gestionar los esquemas (como Apache Avro), garantiza que los datos sean compactos y serializados de manera eficiente. También controla la evolución de los esquemas, permitiendo que se publiquen nuevas versiones de los datos sin interrumpir a los consumidores existentes.

¿Cuál es la forma más fiable para que un consumidor de Kafka procese mensajes?

El método más fiable es que el consumidor desactive "auto-commit" y gestione manualmente los offsets. El patrón correcto es:Consumir un mensaje, procesar la lógica de negocio (por ejemplo, escribir en una base de datos), y solo entoncesCometer el offset de nuevo en Kafka. Estoenable.auto.commit=false asegura que, si la aplicación falla, no perderá datos (al cometer antes de procesar) ni creará duplicados (al procesar pero fallar antes de cometer).