Consommer des messages Kafka avec Python (kafka-python)

Kafka est une plateforme distribuée de messagerie permettant de diffuser des flux de données en temps réel. Dans une architecture de traitement de données, Kafka agit souvent comme un système central de communication entre microservices, processus de traitement, bases de données, etc.

L’objectif de ce tutoriel est d’apprendre à consommer des messages depuis un topic Kafka à l’aide du langage Python. Pour cela, nous allons utiliser une bibliothèque très populaire : kafka-python.


Prérequis

Avant de commencer, assurez-vous que :


Installation de la bibliothèque kafka-python

Kafka-python est une interface simple mais puissante pour interagir avec Kafka depuis Python. Elle est légère, pure Python, et très bien documentée.

pip install kafka-python

Vérifiez ensuite que l’installation a réussi :

python -c "from kafka import KafkaConsumer; print('Kafka OK')" 

Concepts importants avant de consommer

Avant d’écrire du code, quelques concepts clés à comprendre :


Créer un consommateur Kafka simple en Python

Commençons par écrire un script de base qui se connecte à Kafka et affiche les messages du topic data-events.

from kafka import KafkaConsumer

# Création du consommateur
consumer = KafkaConsumer(
    'data-events',  # nom du topic
    bootstrap_servers='localhost:9092',  # adresse du broker Kafka
    auto_offset_reset='earliest',  # commence depuis le début du topic
    enable_auto_commit=True,  # Kafka mettra à jour automatiquement l’offset
    group_id='groupe-data-science',  # identifiant de groupe
    value_deserializer=lambda x: x.decode('utf-8')  # transformation des bytes en string
)

# Boucle infinie de lecture
for message in consumer:
    print(f"Message reçu : {message.value}")
  

Explication détaillée des paramètres


Lire les métadonnées du message

Kafka vous donne accès à bien plus que le simple contenu du message :

for message in consumer:
    print("Contenu :", message.value)
    print("Partition :", message.partition)
    print("Offset :", message.offset)
    print("Clé :", message.key)
    print("Timestamp :", message.timestamp)
  

Ces informations sont utiles pour faire du monitoring, de la relecture ou de l’audit.


Gestion propre de l’arrêt du consommateur

Pour éviter de laisser un consommateur ouvert (et bloqué), il est conseillé d’utiliser un bloc try/except avec fermeture explicite.

try:
    for message in consumer:
        print(f"Message : {message.value}")
except KeyboardInterrupt:
    print("Arrêt du consommateur.")
finally:
    consumer.close()
  

Tester avec un producteur manuel

Ouvrez un terminal et utilisez Kafka CLI pour envoyer un message :

bin/kafka-console-producer.sh --topic data-events --bootstrap-server localhost:9092

Ensuite, tapez un message dans le terminal et validez avec Entrée. Il sera instantanément affiché par le consommateur Python.


Consommer une seule partition

Si vous souhaitez consommer uniquement une partition spécifique d’un topic, vous pouvez utiliser la méthode assign() :

from kafka import TopicPartition

partition = TopicPartition('data-events', 0)
consumer.assign([partition])

for msg in consumer:
    print(f"[P0] {msg.value}")
  

Cela peut être utile pour les systèmes de traitement déterministe ou le debugging.


Lecture contrôlée avec gestion manuelle de l’offset

Dans des cas plus complexes (ex : systèmes critiques), il est préférable de gérer manuellement les offsets :

consumer = KafkaConsumer(
    'data-events',
    bootstrap_servers='localhost:9092',
    enable_auto_commit=False,  # désactive le commit automatique
    group_id='groupe-analyse',
    auto_offset_reset='earliest',
    value_deserializer=lambda x: x.decode('utf-8')
)

for message in consumer:
    print(message.value)
    consumer.commit()  # commit explicite de l'offset
  

Attention : si vous oubliez de commettre les offsets manuellement, votre consumer relira les mêmes messages à chaque redémarrage.


Conclusion

Vous venez de créer un consommateur Kafka complet en Python, capable de lire des messages depuis un topic, de décoder leur contenu, et de gérer l’offset. Cela vous permet de mettre en place un traitement temps réel de vos flux de données.

Dans le prochain tutoriel, nous verrons comment produire des messages Kafka en Python, afin de créer un pipeline complet de publication/consommation de données.