Consommer des messages Kafka avec Python (kafka-python)
Kafka est une plateforme distribuée de messagerie permettant de diffuser des flux de données en temps réel. Dans une architecture de traitement de données, Kafka agit souvent comme un système central de communication entre microservices, processus de traitement, bases de données, etc.
L’objectif de ce tutoriel est d’apprendre à consommer des messages depuis un topic Kafka à l’aide du langage Python. Pour cela, nous allons utiliser une bibliothèque très populaire : kafka-python
.
Prérequis
Avant de commencer, assurez-vous que :
- Un broker Kafka est en cours d’exécution sur
localhost:9092
. - Vous disposez d’un topic Kafka nommé
data-events
(vous pouvez le créer avec la CLI de Kafka si nécessaire). - Python 3.7 ou supérieur est installé sur votre machine.
Installation de la bibliothèque kafka-python
Kafka-python est une interface simple mais puissante pour interagir avec Kafka depuis Python. Elle est légère, pure Python, et très bien documentée.
pip install kafka-python
Vérifiez ensuite que l’installation a réussi :
python -c "from kafka import KafkaConsumer; print('Kafka OK')"
Concepts importants avant de consommer
Avant d’écrire du code, quelques concepts clés à comprendre :
- Topic : flux de messages catégorisés. Pensez à un « canal » d’informations.
- Partition : chaque topic est découpé en partitions, qui permettent une lecture parallèle et distribuée.
- Consumer : un client Kafka qui lit les messages.
- Consumer Group : un ensemble de consumers qui se partagent la lecture des partitions d’un topic.
- Offset : position d’un message dans une partition, Kafka garde l’historique de tous les offsets.
Créer un consommateur Kafka simple en Python
Commençons par écrire un script de base qui se connecte à Kafka et affiche les messages du topic data-events
.
from kafka import KafkaConsumer
# Création du consommateur
consumer = KafkaConsumer(
'data-events', # nom du topic
bootstrap_servers='localhost:9092', # adresse du broker Kafka
auto_offset_reset='earliest', # commence depuis le début du topic
enable_auto_commit=True, # Kafka mettra à jour automatiquement l’offset
group_id='groupe-data-science', # identifiant de groupe
value_deserializer=lambda x: x.decode('utf-8') # transformation des bytes en string
)
# Boucle infinie de lecture
for message in consumer:
print(f"Message reçu : {message.value}")
Explication détaillée des paramètres
'data-events'
: le nom du topic que vous consommez. Il doit exister dans votre broker Kafka.bootstrap_servers
: liste des adresses (host:port) des brokers Kafka. Un seul suffit pour commencer.auto_offset_reset
: comportement si aucun offset n’est trouvé. Valeurs possibles :'earliest'
: commence depuis le tout début du topic.'latest'
: commence avec les prochains messages uniquement.
enable_auto_commit
: si activé, Kafka se charge d’enregistrer automatiquement l’offset des messages consommés. Utile pour des lectures simples.group_id
: identifiant du groupe. Kafka s’en sert pour coordonner plusieurs consommateurs et répartir les partitions.value_deserializer
: fonction qui transforme les bytes reçus en objets lisibles. Ici, on les transforme en chaînes UTF-8.
Lire les métadonnées du message
Kafka vous donne accès à bien plus que le simple contenu du message :
for message in consumer:
print("Contenu :", message.value)
print("Partition :", message.partition)
print("Offset :", message.offset)
print("Clé :", message.key)
print("Timestamp :", message.timestamp)
Ces informations sont utiles pour faire du monitoring, de la relecture ou de l’audit.
Gestion propre de l’arrêt du consommateur
Pour éviter de laisser un consommateur ouvert (et bloqué), il est conseillé d’utiliser un bloc try/except
avec fermeture explicite.
try:
for message in consumer:
print(f"Message : {message.value}")
except KeyboardInterrupt:
print("Arrêt du consommateur.")
finally:
consumer.close()
Tester avec un producteur manuel
Ouvrez un terminal et utilisez Kafka CLI pour envoyer un message :
bin/kafka-console-producer.sh --topic data-events --bootstrap-server localhost:9092
Ensuite, tapez un message dans le terminal et validez avec Entrée. Il sera instantanément affiché par le consommateur Python.
Consommer une seule partition
Si vous souhaitez consommer uniquement une partition spécifique d’un topic, vous pouvez utiliser la méthode assign()
:
from kafka import TopicPartition
partition = TopicPartition('data-events', 0)
consumer.assign([partition])
for msg in consumer:
print(f"[P0] {msg.value}")
Cela peut être utile pour les systèmes de traitement déterministe ou le debugging.
Lecture contrôlée avec gestion manuelle de l’offset
Dans des cas plus complexes (ex : systèmes critiques), il est préférable de gérer manuellement les offsets :
consumer = KafkaConsumer(
'data-events',
bootstrap_servers='localhost:9092',
enable_auto_commit=False, # désactive le commit automatique
group_id='groupe-analyse',
auto_offset_reset='earliest',
value_deserializer=lambda x: x.decode('utf-8')
)
for message in consumer:
print(message.value)
consumer.commit() # commit explicite de l'offset
Attention : si vous oubliez de commettre les offsets manuellement, votre consumer relira les mêmes messages à chaque redémarrage.
Conclusion
Vous venez de créer un consommateur Kafka complet en Python, capable de lire des messages depuis un topic, de décoder leur contenu, et de gérer l’offset. Cela vous permet de mettre en place un traitement temps réel de vos flux de données.
Dans le prochain tutoriel, nous verrons comment produire des messages Kafka en Python, afin de créer un pipeline complet de publication/consommation de données.