Skip to main content

Kafka And Zookeeper SetUp

 Kafka And Zookeeper SetUp


zookeeper download Link : https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz

Configuration:

zoo.conf

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
4 char whitelist in command arguments
4lw.commands.whitelist=*

Start ZooKeeper Server

$ bin/zkServer.sh start
Check zookeeper status 

dheeraj.kumar@Dheeraj-Kumar bin % echo stat | nc localhost 2181


stat is 4 character whitelisted argument 


Check Kafka running status :

echo dump | nc localhost 2181 | grep broker


Responsibility of Leader in Zookeeper:

1. Distributes work among different nodes.



Check Kafka Status of running brokers::


echo dump | nc localhost 2181 | grep brokers


it should return ::


dheeraj.kumar@Dheeraj-Kumar ~ % echo dump | nc localhost 2181 | grep brokers

/brokers/ids/3

/brokers/ids/1

/brokers/ids/2



Check controller in Kafka ::



echo dump | nc localhost 2181


it should return ::


dheeraj.kumar@Dheeraj-Kumar ~ % echo dump | nc localhost 2181               

SessionTracker dump:

Global Sessions(3):

0x20007d77f190000 18000ms

0x30007d784620000 18000ms

0x30007d784620001 18000ms

ephemeral nodes dump:

Sessions with Ephemerals (3):

0x30007d784620001:

/brokers/ids/3

0x30007d784620000:

/controller

/brokers/ids/1

0x20007d77f190000:

/brokers/ids/2

Connections dump:

Connections Sets (1)/(1):

1 expire at Tue Jan 16 17:41:49 GST 2024:

ip: /127.0.0.1:53928 sessionId: 0x0



Topic creation ::


bin/kafka-topics.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --create --topic topic111 --partitions 3 --replication-factor 3



it should displayt ::


Created topic topic111.


Now describe topic :::


dheeraj.kumar@Dheeraj-Kumar kafka_2.13-3.3.1_1 % bin/kafka-topics.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --describe --topic topic111  


it should display ::


Topic: topic111 TopicId: rdNujlz3RJCyt02Mc7X_Og PartitionCount: 3 ReplicationFactor: 3 Configs: 

Topic: topic111 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1

Topic: topic111 Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2

Topic: topic111 Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3



In Kafka Cluster, we only have single controller, it manages state of partition and Replicas and performing 

admin task like reassigning partition

state of partitions are 


NonExisitentPartition : partiotion was never created ot got deleted

NewPartition: new partition was created but no leader ISR yet

OnlInePartition: when leader is elected for partition

OfflinePartition: after leader election the parttion dies


NewReplica :

OnlineReplica:

OfflineReplica:

NonExtentReplica : 


When any partition does the transition of states, for example from NewPartition to OnlinePartition.


we can increase the partition but can never decrese it, because it will cause data loss.


Alter Partitons::


dheeraj.kumar@Dheeraj-Kumar kafka_2.13-3.3.1_1 % bin/kafka-topics.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --alter --topic topic111 --partitions 4



Partition Re-assignment:

1. Move partition across brokers

2. Selectively move replica of partitions to specific set of brokers

3. Increase replication factor




move partition :: 


1. first gene3rate proposed change json 

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file /Users/dheeraj.kumar/Documents/apache-kafka-and-zookeeper/apache-zookeeper-3.8.3-bin_1/topicToMove.json --broker-list "1,2" --generate


The above command response ::


dheeraj.kumar@Dheeraj-Kumar kafka_2.13-3.3.1_1 % bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file /Users/dheeraj.kumar/Documents/apache-kafka-and-zookeeper/apache-zookeeper-3.8.3-bin_1/topicToMove.json --broker-list "3,2,1" --generate 

Current partition replica assignment

{"version":1,"partitions":[{"topic":"topic111","partition":0,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":1,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":2,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]}]}


Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"topic111","partition":0,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":1,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":2,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":3,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}



Now created another json using propose json


and run below command :::


dheeraj.kumar@Dheeraj-Kumar kafka_2.13-3.3.1_1 % bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /Users/dheeraj.kumar/Documents/apache-kafka-and-zookeeper/apache-zookeeper-3.8.3-bin_1/suggestedChange.json --execute

Current partition replica assignment


{"version":1,"partitions":[{"topic":"topic111","partition":0,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":1,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":2,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"topic111","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]}]}


Save this to use as the --reassignment-json-file option during rollback

Successfully started partition reassignments for topic111-0,topic111-1,topic111-2,topic111-3



Now verify the transition of partition, using below command ::


dheeraj.kumar@Dheeraj-Kumar kafka_2.13-3.3.1_1 % bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /Users/dheeraj.kumar/Documents/apache-kafka-and-zookeeper/apache-zookeeper-3.8.3-bin_1/suggestedChange.json --verify 

Status of partition reassignment:

Reassignment of partition topic111-0 is completed.

Reassignment of partition topic111-1 is completed.

Reassignment of partition topic111-2 is completed.

Reassignment of partition topic111-3 is completed.


Clearing broker-level throttles on brokers 1,2,3

Clearing topic-level throttles on topic topic111

dheeraj.kumar@Dheeraj-Kumar kafka_2.13-3.3.1_1 % 




Change replication factor ::


use this command only:


dheeraj.kumar@Dheeraj-Kumar kafka_2.13-3.3.1_1 % bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /Users/dheeraj.kumar/Documents/apache-kafka-and-zookeeper/apache-zookeeper-3.8.3-bin_1/suggestedChange.json --execute


but in json please do some modification



 

Comments

Popular posts from this blog

How to create Annotation in Spring boot

 To create Custom Annotation in JAVA, @interface keyword is used. The annotation contains :  1. Retention :  @Retention ( RetentionPolicy . RUNTIME ) It specifies that annotation should be available at runtime. 2. Target :  @Target ({ ElementType . METHOD }) It specifies that the annotation can only be applied to method. The target cane be modified to:   @Target ({ ElementType . TYPE }) for class level annotation @Target ({ ElementType . FIELD }) for field level annotation @Retention ( RetentionPolicy . RUNTIME ) @Target ({ ElementType . FIELD }) public @ interface CustomAnnotation { String value () default "default value" ; } value attribute is defined with @ CustomAnnotation annotation. If you want to use the attribute in annotation. A single attribute value. Example : public class Books {           @CustomAnnotation(value = "myBook")     public void updateBookDetail() {         ...

Auto retries in REST API clients On Java On Ease

  Writing REST clients to consume API endpoints has become commonplace. While consuming REST endpoints, we sometimes end up in a situation where a downstream service throws some kind of transient error that goes away when the API call is retried. In such situations, we ask ourselves — “What if my API client was smart enough that knew how to retry a failed call?” Some of us go the extra mile to implement our own custom code that can retry API calls on error. But mind you, it is not only about knowing how to retry. The client also has to know when to retry and when not to. If the error is irrecoverable such as 400 — Bad Request, there is no point in retrying. It might also have to know how to back off and how to recover from the error. Implementing all this by hand, and then repeating it over and over again in every API client is cumbersome. It also adds a lot of boilerplate code and makes things even worse. But if you are a Spring/Spring Boot developer, you will be surprised to know...