Produire des messages Kafka
Kafka est une plateforme distribuée de streaming conçue pour publier, stocker et traiter des flux de données en temps réel. Le producteur Kafka est un composant essentiel qui envoie des messages dans un ou plusieurs topics.
Dans ce tutoriel, vous apprendrez à créer un producteur Kafka en Python grâce à la bibliothèque kafka-python
. Ce guide vous montrera comment connecter un producteur à un cluster Kafka, formater des messages et les publier de manière fiable.
Prérequis
- Kafka est installé et en cours d’exécution sur
localhost:9092
. - Un topic nommé
data-events
est déjà créé dans Kafka. - Python 3.7 ou supérieur est installé.
- Vous avez installé la bibliothèque
kafka-python
.
Installation de kafka-python (si ce n’est pas déjà fait)
pip install kafka-python
Ce module vous permet d’interagir avec Kafka sans dépendance externe, via une interface Python native.
Comprendre le rôle d’un producteur Kafka
Un producteur Kafka est responsable de :
- Se connecter à un ou plusieurs brokers Kafka.
- Construire des messages à envoyer dans un topic Kafka.
- Choisir (ou laisser Kafka choisir) une partition cible.
- S’assurer que les messages sont bien envoyés, éventuellement avec accusé de réception (ACK).
Création d’un producteur simple
Voici un premier exemple de producteur qui envoie un message texte dans le topic data-events
:
from kafka import KafkaProducer
# Création du producteur
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: v.encode('utf-8') # transforme la chaîne en bytes
)
# Envoi d’un message
producer.send('data-events', value='Bonjour Kafka !')
# On attend que les messages soient bien envoyés
producer.flush()
Explications
bootstrap_servers
: l’adresse du broker Kafka.value_serializer
: Kafka attend desbytes
, donc on encode la chaîne en UTF-8.send()
: envoie asynchrone. Il renvoie un objetFuture
que l’on peut utiliser pour gérer les retours.flush()
: force l’envoi immédiat des messages en attente.
Utilisation du key
pour influencer la partition
Kafka utilise la clé pour déterminer dans quelle partition le message sera stocké. Cela permet de garantir l’ordre des messages associés à une même clé.
producer.send(
'data-events',
key=b'cle-utilisateur-42',
value='Message pour un utilisateur précis'.encode('utf-8')
)
Si vous omettez la clé, Kafka répartira les messages aléatoirement entre les partitions (round-robin).
Utilisation des callback
pour vérifier la livraison
Kafka-python permet d’attacher un callback au message envoyé pour savoir s’il a bien été livré ou s’il y a eu une erreur :
from kafka.errors import KafkaError
def on_success(record_metadata):
print(f"Succès : topic={record_metadata.topic}, partition={record_metadata.partition}, offset={record_metadata.offset}")
def on_error(exc):
print(f"Erreur d'envoi : {exc}")
# Envoi avec callbacks
future = producer.send('data-events', value='Message important'.encode('utf-8'))
future.add_callback(on_success)
future.add_errback(on_error)
producer.flush()
Pourquoi c’est utile :
- En production, vous devez toujours vérifier que vos messages sont bien livrés.
- En cas d’échec (broker hors ligne, topic inexistant, etc.), vous pouvez déclencher une alerte ou une tentative de réenvoi.
Envoyer un message JSON
Les messages Kafka peuvent contenir n’importe quel format, tant que vous les encodez correctement. Par exemple, pour envoyer un message JSON :
import json
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
data = {'id': 101, 'event': 'click', 'timestamp': '2025-07-16T10:00:00'}
producer.send('data-events', value=data)
producer.flush()
C’est la méthode recommandée pour envoyer des objets complexes ou structurés (événements, logs, etc.).
Fermer proprement le producteur
Il est important de fermer le producteur avec close()
pour s’assurer que tous les messages sont bien envoyés avant l’arrêt de l’application.
producer.flush()
producer.close()
Exemple complet d’un producteur Kafka Python
from kafka import KafkaProducer
import json
import time
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
try:
for i in range(10):
event = {
'event_id': i,
'type': 'temperature',
'value': 20 + i,
'timestamp': time.time()
}
producer.send('data-events', value=event)
print(f"Message envoyé : {event}")
time.sleep(1) # simulation d’un flux de données
except KeyboardInterrupt:
print("Arrêt manuel.")
finally:
producer.flush()
producer.close()
Conclusion
Vous avez maintenant toutes les bases pour publier des messages Kafka à partir d’un programme Python. Que vous envoyiez des logs, des événements utilisateurs, des métriques ou des messages métier, Kafka vous offre un système distribué fiable et performant.
N’hésitez pas à combiner ce producteur avec le consommateur vu dans le tutoriel précédent pour tester l’envoi et la réception de messages dans une application complète.