Apache Kafka - Setting up for cluster (Part 2)
This article is meant for Production environment, whereby you would have multiple availability zones or at different regions. The goal is serving as some sort of fail over
In this, we will use 2 nodes
Step 1: Install Java (for both machine)
sudo apt install openjdk-21-jdk -y
Next check Java version
java -version
Step 2: Download and Extract Kafka
wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -xzf kafka_2.13-3.9.0.tgz
mv kafka_2.13-3.9.0 kafka
Step 3: Create data directory differently to store Kafka messages and Zookeeper data.
Create a New Directory for Kafka and Zookeeper
sudo mkdir -p /kafka
sudo mkdir -p /zookeeper
Change ownership of those directories now, we are using ubuntu as the user:
sudo chown -R ubuntu:ubuntu /kafka
sudo chown -R ubuntu:ubuntu /zookeeper
Next create and edit zookeeper.properties. Replace the zookeeper1 and zookeeper2 with ips or if you have DNS setup, you can resolve it to the machine name.
nano ~/config/zookeeper.properties
# the directory where the snapshot is stored.
dataDir=/zookeeper
# the port at which the clients will connect
clientPort=2181
# setting number of connections to unlimited
maxClientCnxns=0
# keeps a heartbeat of zookeeper in milliseconds
tickTime=2000
# time for initial synchronization
initLimit=10
# how many ticks can pass before timeout
syncLimit=5
# define servers ip and internal ports to zookeeper
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
for Kafka
Replace
<Zookeeper_Cluster_SiteA>
with the connection string for Site A's Zookeeper cluster.For Site B, use
<Zookeeper_Cluster_SiteB>
.Change the broker.id for SiteB to 2
nano ~/config/server.properties
broker.id=1
log.dirs=/tmp/kafka-logs
zookeeper.connect=<Zookeeper_Cluster_SiteA>
listeners=PLAINTEXT://<Broker_IP>:9092
advertised.listeners=PLAINTEXT://<Broker_IP>:9092
num.network.threads=3
num.io.threads=8
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
Step 5: Setting up Kafka as a systemd service
Setting up Zookeeper systemd
sudo nano /etc/systemd/system/zookeeper.service
add the following
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
After=network.target
[Service]
Type=simple
User=ubuntu
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Create a Kafka systemd
sudo nano /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org/documentation.html
After=network.target zookeeper.service
[Service]
Type=simple
User=ubuntu
ExecStart=/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
2. Configuring MirrorMaker 2 for Failover
Step 1: MirrorMaker 2 Setup (install on both nodes)
Deploy MirrorMaker 2 on nodes.
Create a configuration file (
mm2.properties
):propertiesCopy code# Source cluster clusters=SiteA,SiteB # Site A configuration SiteA.bootstrap.servers=<Broker_List_SiteA> SiteA.consumer.group.id=mm2-consumer SiteA.offset.storage.topic=mm2-offsets SiteA.config.storage.topic=mm2-configs SiteA.status.storage.topic=mm2-status # Site B configuration SiteB.bootstrap.servers=<Broker_List_SiteB> SiteB.consumer.group.id=mm2-consumer SiteB.offset.storage.topic=mm2-offsets SiteB.config.storage.topic=mm2-configs SiteB.status.storage.topic=mm2-status # Replication policies replication.policy.separator=_ replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy # Enable replication SiteA->SiteB.enabled=true #SiteB->SiteA.enabled=true #activate only when failover from SiteB to SiteA
Start MirrorMaker 2:
bin/connect-mirror-maker.sh mm2.properties
Correct Configuration if you are using ips:
Define logical cluster names (e.g.,
SiteA
andSiteB
):propertiesCopy codeclusters=SiteA,SiteB
Map each cluster to its brokers using the
bootstrap.servers
property:propertiesCopy codeSiteA.bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092 SiteB.bootstrap.servers=192.168.2.1:9092,192.168.2.2:9092
Why Use Logical Names?
The clusters
property allows you to define reusable names for clusters in your MirrorMaker 2 configuration. These logical names make the configuration easier to read and maintain, especially when managing multiple clusters. The actual IP addresses or hostnames are specified in the bootstrap.servers
setting for each cluster.
Example Configuration:
Here’s how the configuration might look with IP addresses:
propertiesCopy codeclusters=SiteA,SiteB
# Site A Kafka cluster
SiteA.bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092
SiteA.consumer.group.id=mm2-consumer-SiteA
SiteA.offset.storage.topic=mm2-offsets-SiteA
SiteA.config.storage.topic=mm2-configs-SiteA
SiteA.status.storage.topic=mm2-status-SiteA
# Site B Kafka cluster
SiteB.bootstrap.servers=192.168.2.1:9092,192.168.2.2:9092
SiteB.consumer.group.id=mm2-consumer-SiteB
SiteB.offset.storage.topic=mm2-offsets-SiteB
SiteB.config.storage.topic=mm2-configs-SiteB
SiteB.status.storage.topic=mm2-status-SiteB
# Replication settings
SiteA->SiteB.enabled=true
SiteB->SiteA.enabled=true
replication.policy.separator=_
replication.policy.class=org.apache.kafka.connect.mirror.DefaultRepli
Create a Service Unit File: Use sudo
to create a new unit file under /etc/systemd/system/
. For example, create kafka-mirrormaker.service
:
[Unit] Description=Kafka MirrorMaker 2 Service After=network.target [Service] Type=simple User=ubuntu Group=ubuntu ExecStart=/path/to/kafka/bin/connect-mirror-maker.sh /path/to/mm2.properties Restart=on-failure RestartSec=5 Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G" WorkingDirectory=/path/to/kafka StandardOutput=syslog StandardError=syslog SyslogIdentifier=mirror-maker [Install] WantedBy=multi-user.target
Do this for both but only activate on siteA and not site B
Step 3: Start the services
Start the services for both nodes
sudo systemctl daemon-reload
sudo systemctl start zookeeper
sudo systemctl enable zookeeper
sudo systemctl start kafka
sudo systemctl enable kafka
sudo systemctl enable mirror-maker.service #siteA only, #site B you do not want data to come back
sudo systemctl start mirror-maker.service #siteA only
Failover Handling
Increased Partitions During Failover:
Sync Partition Counts:
If Site A fails and Site B adds partitions, manually update Site A to match the partition count when it comes back online:
systemctl stop mirror-maker.service bin/kafka-topics.sh --bootstrap-server <Node1_IP>:9092 --alter --topic <topic-name> --partitions <new-partition-count>
Resume MirrorMaker 2:
Restart MirrorMaker 2 on the recovered node to resume replication:
systemctl restart mirror-maker.service