How to create Kafka Client in Python?

Sujit Patel
3 min readDec 23, 2022

Kafka is a Distributed Event Streaming platform capable of storing and processing trillions of Messages or Data Streams per day. Users can access such instantaneous and real-time data to build event-driven applications. In this article, we’ll use the Kafka client in Python to publish and consume messages.

pyhton-kafka-client

Overview

- Create Kafka Docker Instance

- Install the Kafka Python library

- Import the KafkaClient class from the kafka-python library

- Produce a message to a topic

- Consume messages from a topic

Links

Kafka-python Git: https://github.com/dpkp/kafka-python

Documentation: https://kafka-python.readthedocs.io/en/master/apidoc/modules.html

My Git Repository: https://github.com/psujit775/docker-kafka-python-client.git

Steps

1. Create Kafka Docker Instance

create a docker-compose.yml file with the below content.

---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:latest
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

Run below command in terminal to start Kafka instance

docker-compose up -d

2. Install the Kafka Python library:

pip install kafka-python

3. Import the KafkaClient class from the kafka-python library:

from kafka import KafkaClient

4. KafkaClient to produce a message to a topic:

You can now use the KafkaClient instance to perform various operations, such as producing and consuming messages, listing topics, and more.

Here is an example of using the KafkaClient to produce message to a topic:

create a file name producer.py with the below content.

from kafka import KafkaProducer
from time import sleep
from json import dumps

# Create a producer
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda x: dumps(x).encode('utf-8'))

# Send a message to the "my-topic" topic
for i in range(100):
producer.send("my-topic", i)
print("Sending data : {}".format(i))
sleep(5)

open a terminal and execute the file using the below command.

python3 producer.py

Sample Output:

producer.py

5. KafkaClient to consume messages from a topic:

create a file name consumer.py with the below content.

from kafka import KafkaConsumer, TopicPartition

# Create a consumer
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
enable_auto_commit=True,
group_id='my-group',
auto_offset_reset='earliest')

topic_partition = TopicPartition("my-topic", 0)
consumer.assign([topic_partition])

# Comment below line to read only new messages.
consumer.seek_to_beginning()

for message in consumer:
print (message.value.decode('utf-8'))

open a terminal and execute the file using the below command.

python3 consumer.py

Sample Output:

consumer.py

Conclusion

In this article, we have learned how to send a simple message from the Python Kafka client and receive messages. However, using the Kafka Python Client, you can send large or high-end messages from producers to consumers by having Kafka Servers as a Mediator.

--

--

Sujit Patel

DevOps Engineer, Linux lover, Technology and Automation enthusiast. A strong believer in continuous learning.