Apache Kafka - Setting up for cluster (Part 2)

·

4 min read

This article is meant for Production environment, whereby you would have multiple availability zones or at different regions. The goal is serving as some sort of fail over

In this, we will use 2 nodes

Step 1: Install Java (for both machine)

sudo apt install openjdk-21-jdk -y

Next check Java version

java -version

Step 2: Download and Extract Kafka

wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -xzf kafka_2.13-3.9.0.tgz
mv kafka_2.13-3.9.0 kafka

Step 3: Create data directory differently to store Kafka messages and Zookeeper data.

Create a New Directory for Kafka and Zookeeper

sudo mkdir -p /kafka
sudo mkdir -p /zookeeper

Change ownership of those directories now, we are using ubuntu as the user:

sudo chown -R ubuntu:ubuntu /kafka
sudo chown -R ubuntu:ubuntu /zookeeper

Next create and edit zookeeper.properties. Replace the zookeeper1 and zookeeper2 with ips or if you have DNS setup, you can resolve it to the machine name.

nano ~/config/zookeeper.properties
# the directory where the snapshot is stored.
dataDir=/zookeeper
# the port at which the clients will connect
clientPort=2181
# setting number of connections to unlimited
maxClientCnxns=0
# keeps a heartbeat of zookeeper in milliseconds
tickTime=2000
# time for initial synchronization
initLimit=10
# how many ticks can pass before timeout
syncLimit=5
# define servers ip and internal ports to zookeeper
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888

for Kafka

  • Replace <Zookeeper_Cluster_SiteA> with the connection string for Site A's Zookeeper cluster.

  • For Site B, use <Zookeeper_Cluster_SiteB>.

  • Change the broker.id for SiteB to 2

nano ~/config/server.properties
broker.id=1
log.dirs=/tmp/kafka-logs
zookeeper.connect=<Zookeeper_Cluster_SiteA>
listeners=PLAINTEXT://<Broker_IP>:9092
advertised.listeners=PLAINTEXT://<Broker_IP>:9092
num.network.threads=3
num.io.threads=8
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

Step 5: Setting up Kafka as a systemd service

Setting up Zookeeper systemd

sudo nano /etc/systemd/system/zookeeper.service

add the following

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
After=network.target

[Service]
Type=simple
User=ubuntu
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Create a Kafka systemd

sudo nano /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org/documentation.html
After=network.target zookeeper.service

[Service]
Type=simple
User=ubuntu
ExecStart=/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

2. Configuring MirrorMaker 2 for Failover

Step 1: MirrorMaker 2 Setup (install on both nodes)

  • Deploy MirrorMaker 2 on nodes.

  • Create a configuration file (mm2.properties):

      propertiesCopy code# Source cluster
      clusters=SiteA,SiteB
    
      # Site A configuration
      SiteA.bootstrap.servers=<Broker_List_SiteA>
      SiteA.consumer.group.id=mm2-consumer
      SiteA.offset.storage.topic=mm2-offsets
      SiteA.config.storage.topic=mm2-configs
      SiteA.status.storage.topic=mm2-status
    
      # Site B configuration
      SiteB.bootstrap.servers=<Broker_List_SiteB>
      SiteB.consumer.group.id=mm2-consumer
      SiteB.offset.storage.topic=mm2-offsets
      SiteB.config.storage.topic=mm2-configs
      SiteB.status.storage.topic=mm2-status
    
      # Replication policies
      replication.policy.separator=_
      replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    
      # Enable replication
      SiteA->SiteB.enabled=true
      #SiteB->SiteA.enabled=true  #activate only when failover from SiteB to SiteA
    
  • Start MirrorMaker 2:

      bin/connect-mirror-maker.sh mm2.properties
    

    Correct Configuration if you are using ips:

    1. Define logical cluster names (e.g., SiteA and SiteB):

       propertiesCopy codeclusters=SiteA,SiteB
      
    2. Map each cluster to its brokers using the bootstrap.servers property:

       propertiesCopy codeSiteA.bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092
       SiteB.bootstrap.servers=192.168.2.1:9092,192.168.2.2:9092
      

Why Use Logical Names?

The clusters property allows you to define reusable names for clusters in your MirrorMaker 2 configuration. These logical names make the configuration easier to read and maintain, especially when managing multiple clusters. The actual IP addresses or hostnames are specified in the bootstrap.servers setting for each cluster.

Example Configuration:

Here’s how the configuration might look with IP addresses:

    propertiesCopy codeclusters=SiteA,SiteB

    # Site A Kafka cluster
    SiteA.bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092
    SiteA.consumer.group.id=mm2-consumer-SiteA
    SiteA.offset.storage.topic=mm2-offsets-SiteA
    SiteA.config.storage.topic=mm2-configs-SiteA
    SiteA.status.storage.topic=mm2-status-SiteA

    # Site B Kafka cluster
    SiteB.bootstrap.servers=192.168.2.1:9092,192.168.2.2:9092
    SiteB.consumer.group.id=mm2-consumer-SiteB
    SiteB.offset.storage.topic=mm2-offsets-SiteB
    SiteB.config.storage.topic=mm2-configs-SiteB
    SiteB.status.storage.topic=mm2-status-SiteB

    # Replication settings
    SiteA->SiteB.enabled=true
    SiteB->SiteA.enabled=true
    replication.policy.separator=_
    replication.policy.class=org.apache.kafka.connect.mirror.DefaultRepli

Create a Service Unit File: Use sudo to create a new unit file under /etc/systemd/system/. For example, create kafka-mirrormaker.service:

  •   [Unit]
      Description=Kafka MirrorMaker 2 Service
      After=network.target
    
      [Service]
      Type=simple
      User=ubuntu
      Group=ubuntu
      ExecStart=/path/to/kafka/bin/connect-mirror-maker.sh /path/to/mm2.properties
      Restart=on-failure
      RestartSec=5
      Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"
      WorkingDirectory=/path/to/kafka
      StandardOutput=syslog
      StandardError=syslog
      SyslogIdentifier=mirror-maker
    
      [Install]
      WantedBy=multi-user.target
    

    Do this for both but only activate on siteA and not site B

Step 3: Start the services

Start the services for both nodes

sudo systemctl daemon-reload
sudo systemctl start zookeeper
sudo systemctl enable zookeeper
sudo systemctl start kafka
sudo systemctl enable kafka
sudo systemctl enable mirror-maker.service  #siteA only, #site B you do not want data to come back
sudo systemctl start mirror-maker.service   #siteA only

Failover Handling

Increased Partitions During Failover:

  1. Sync Partition Counts:

    • If Site A fails and Site B adds partitions, manually update Site A to match the partition count when it comes back online:

        systemctl stop mirror-maker.service
        bin/kafka-topics.sh --bootstrap-server <Node1_IP>:9092 --alter --topic <topic-name> --partitions <new-partition-count>
      
  2. Resume MirrorMaker 2:

    • Restart MirrorMaker 2 on the recovered node to resume replication:

        systemctl restart mirror-maker.service