source: quanta magazine

Using JAVA & Zookeeper to build a distributed key value store

Abhijit Mondal

--

In this post, I am going to build a simple distributed key-value store using JAVA and sockets for networking.

I am going to show how to use Zookeeper as a coordination service in a cluster with multiple partitions and replication.

Following are the functions of Zookeeper service in this system:

  1. Maintaining the mapping from server to partition i.e. which servers comes under partition ‘i’. These data can also be used to infer which servers are replicas of one another.
  2. Maintaining the leader for each partition.
  3. Maintaining the latest sequence number of the commit log entries. I will show below what is the purpose of this data.

Do we really need Zookeeper or some other service for coordination?

In one of my earlier post I showed how we can use the Raft consensus protocol to build a distributed key value store. There we did not require any additional coordination service.

Understanding and Implementing Raft Consensus Protocol in Python | by Abhijit Mondal | Medium

  • Leader election happens in Raft by passing messages among the replicas. Each server, gives vote to itself or to another server which is “more updated” than itself.
    Based on the votes, a server is chosen to be the leader when it receives a majority.
  • We need to inform the leader about the addresses of its replicas so that communications can happen.

Although Raft is useful if we do not need the overhead of an additional service but it also has some drawbacks:

  • Leader election is a random process i.e. an election may or may not successfully elect a leader. If election fails to elect a leader, then re-election will happen until a leader is elected.
  • All reads and writes go through Leader. Although it favors strong consistency but has performance drawbacks since only one server per partition (the leader) is serving all read/write request to clients.
  • In Zookeeper, although all writes go through the leader, but reads can use any of the replicas.
    This has speedup advantage as reads can scale with the number of replicas added, but fails to deliver strong consistency as the replica responding to read request might have stale data.

For e.g. if one Zookeeper replica can serve 10K reads per second, then with a total of 10 replicas, it can serve 100K reads per second, whereas with Raft it will still serve 10K requests per second or lower because there is an additional overhead of coordinating with 10 replicas.

Some Zookeeper Fundamentals

Think of Zookeeper more or less of a distributed key-value store itself. But with Zookeeper, the data size per entry is limited to 1MB only and thus this store is suitable for storing metadata and configuration related data.

For e.g.

  1. Server addresses
  2. Database related configurations and parameters
  3. Constants used by multiple servers
  4. Configurations specific to tenants in a multi-tenant system
  5. … and so on.

Instead of a standard hash table, it stores the entries in a tree like data structure. Each node of the tree is referrred to as a Znode.

The tree like structure resembles a filesystem and thus it allows some features found in filesystems such as:

  1. A Znode with children is similar to a directory and thus one can add children, delete children or delete the entire directory in one go.
  2. Listing children of a Znode efficiently.
  3. Can set permissions on each Znode.

For e.g. if my service has 3 partitions and each partition has 3 replicas, then we can create the following tree like structure:

/app
/app/<partition_id>
/app/<partition_id>/replicas
/app/<partition_id>/replicas/<server_address>
Zookeeper Znode Tree Structure

The benefit of having this structure instead of a hash table are:

  1. Can list all the servers for a given partition id in one query.
  2. Can list all partitions for the given cluster in one query.

To do the same with a hash table we need to issue 3 queries to get replicas for each partition and 3 queries to get all the partition ids.

There are 2 types of Znodes: Persistent and Ephemeral

  1. In persistent, if the server which created the znode, dies, the znode still persists in zookeeper.
  2. With ephemeral znodes, if the server which created the znode, dies, the znode gets deleted. Ephemeral znodes cannot have any children.

To monitor changes in the tree structure or changes in the data associated with each Znode, we can leverage Watchers.

For e.g. if we want to monitor for all replicas added or removed from a partition, we can write an infinite loop and keep checking inside the loop with some interval (in a different thread) or can leverage Watchers.

private static ZooKeeper zkeeper;
private static List<String> replicas = new ArrayList<String>();

public static void getReplicas(int partitionId) {
String path = "/app/" + partitionId + "/replicas";
try {
replicas = zkeeper.getChildren(path, null);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void monitorChanges(int partitionId) {
// Refresh all replicas after every 2 seconds
while(true) {
getReplicas(partitionId);
TimeUnit.SECONDS.sleep(2);
}
}

public static void main(String[] args) {
new Thread(() -> monitorChanges(1)).start();
}

Watchers do the same thing, only thing is that we do not need to explicitly monitor the Znodes in a separate thread. Whenever there is some change w.r.t. an event in which we are interested, watchers can send notifications to the client application.

public static void getReplicas(int partitionId) {
try {
String path = "/app/" + partitionId + "/replicas";

// If watching, then check if any new child is added or a
// child is removed, if changed, then recompute and call the
// function recursively.
replicas = zkeeper.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
getReplicas(partitionId);
}
}
});

} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
getReplicas(1);
}

The advantage of the above implementation is that replicas will be recomputed only when there is a change in the number of replicas.

We are calling the getReplicas() method recursively inside the watcher because the watcher triggers only once. Thus to continue looping we need to recursively call the getReplicas() method.

The Zookeeper API allows us to read and write both synchronously as well as asynchronously.

In asynchronous reads and writes we use something known as a Callback i.e. a function which gets called by the Zookeeper agent whenever the data is written (or failed) or ready to be read. In this way, reads and writes returns immediately to client without making the client wait and on processing of the result in the future, the callback function gets called.

For e.g. the above code for getting all replicas is synchronous i.e. the main thread will be blocked. Although we can call the getReplicas() method using a separate Thread, but threads are expensive.

Here is an asynchronous implementation for the above:

private static Set<String> replicas = new HashSet<String>();

public static void getReplicas(int partitionId) {
try {
String path = "/app/" + partitionId + "/replicas";

zkeeper.getChildren(
path,
new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
getReplicas(partitionId);
}
}
},
new ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch(Code.get(rc)) {
case OK:
replicas = children;
break;
case CONNECTIONLOSS:
getReplicas(partitionId);
break;
default:
break;
}
}
}
);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
getReplicas(1);
}

What happens in the above implementation are as follows:

  1. When getReplicas is called, it immediately returns but sets the watcher and registers the callback ChildrenCallback.
  2. The results of the getChildren is not returned immediately, whenever Zookeeeper is ready with the response, it calls the callback’s processResult method with the results i.e. the children znodes.
  3. Meanwhile we are also setting the watcher whose job is to monitor for any changes and continue looping whenever there is a change.
  4. Also if there is any connection loss due to network, we call the getReplicas method because the watcher gets deactivated.

Using with a Distributed Key Value Store

Here is the step by step overview of the distributed key-value system:

Distributed Systems with Zookeeper for Coordination

Create Znodes

When a server joins the cluster, it will create the following Znodes:

/partitions ---> dummy data
/partitions/<partition_id> ---> dummy data
/partitions/<partition_id>/replicas ---> dummy data
/partitions/<partition_id>/replicas/<server_address> ---> -1

The last znode above is ephemeral i.e. this znode will be deleted if the server which created it crashes or is not responding to Zookeeper heartbeats.

All other znodes are persistent.

The data corresponding to the last znode is the latest sequence number in the commit log of the server.

Note that the first 3 znodes are common to all replicas in a given partition and thus only one of the them will succeed in creating these znodes, the other replicas will see that these znodes are already existing and hence will do nothing except for creating the last znode which is server specific.

public static void createZnodes() {
try {
byte[] data = "Hello".getBytes();

zkmanager.create("/partitions", data, true, false);
zkmanager.create("/partitions/" + partitionId, data, true, false);
zkmanager.create("/partitions/" + partitionId + "/replicas", data, true, false);
zkmanager.create("/partitions/" + partitionId + "/replicas/" + hostPort, "-1".getBytes(), false, false);

} catch (Exception e) {
e.printStackTrace();
}
}

partitionId is the partition id assigned to the server and hostPort is the server ip address — IPv4:Port

The create method for zkmanager looks like:

public int create(String path, 
byte[] data,
boolean isPersistent,
boolean isSequential) throws KeeperException,
InterruptedException {

// Check if the znode already exists
Stat stat = zkeeper.exists(path, true);

if (stat == null) {
// If znode doesn't exists already

if (isPersistent) {
// Create persistent znode
zkeeper.create(path,
data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
else {
if (isSequential) {
// Create ephemeral sequential znode
zkeeper.create(path,
data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
else {
// Create ephemeral non sequential znode
zkeeper.create(path,
data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
}
return 1;
}

return -1;
}

The complete zkmanager class is implemented here in my github repo :

zookeeper-kv-store/src/main/java/com/example/ZKManager.java at main · funktor/zookeeper-kv-store (github.com)

Get all replicas for new leader

Get other replicas in the same partition

We need to continuously monitor for existing replicas in the partition because the commit log and the state of the leader node must be replicated asynchronously to all the replicas.

We can get and monitor all replicas using the code we saw earlier (with a watcher and callback):

private Set<String> replicas = new ConcurrentSkipListSet<String>();
private Map<String, Integer> logSeq = new ConcurrentHashMap<String, Integer>();

public void getReplicas(int partitionId) {
try {
String path = "/partitions/" + partitionId + "/replicas";

zkmanager.getZNodeChildrenAsync(
path,
new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
getReplicas(partitionId);
}
}
},
new ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch(Code.get(rc)) {
case OK:
replicas = new ConcurrentSkipListSet<String>(children);
logSeq.clear();

for (String replica : replicas) {
getLogSequence(partitionId, replica);
}

break;

case CONNECTIONLOSS:
getReplicas(partitionId);
break;

default:
break;
}
}
}
);
} catch (Exception e) {
e.printStackTrace();
}
}

When the list of replicas is ready to be sent to the client, we also compute the commit log sequences for each replica i.e. the latest sequence number in its commit log for each replica.

Although this is useful only to the current leader of the partition.

The method getLogSequence is implemented as follows:

public void getLogSequence(String partitionId, String replica) {
try {
String path = "/partitions/" + partitionId + "/replicas/" + replica;

zkmanager.getZNodeDataAsync(
path,
new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
getLogSequence(partitionId, replica);
}
}
},
new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
switch(Code.get(rc)) {
case OK:
try {
String d = new String(data, "UTF-8");
logSeq.put(replica, Integer.parseInt(d));

} catch (Exception e) {
e.printStackTrace();
}
break;

case NONODE:
break;

case CONNECTIONLOSS:
getLogSequence(partitionId, replica);
break;

default:
break;
}
}
}
);
} catch (Exception e) {
e.printStackTrace();
}
}

The updateSequence method also uses Zookeeper to update the log sequence data asynchronously.

public static void updateSequence(int partitionId, 
String replica, int sequence) {
try {
String path = "/partitions/" + partitionId + "/replicas/" + replica;
zkmanager.updateAsync(
path,
Integer.toString(sequence).getBytes(),
new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch(Code.get(rc)) {
case OK:
break;

default:
updateSequence(partitionId, replica, sequence);
break;
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}

Create a copy of the Consistent Hashing Ring

Get other partitions and add those partitions into the Consistent Hashing data structure.

Our distributed key value store works on the basis of Consistent Hashing based partitioning. I will not be going over the details of the algorithm which can be found in my earlier posts on using Chord for DHT.

Deep dive into Chord for Distributed Hash Table | by Abhijit Mondal | Medium

The Consistent Hashing class in Java would look like something:

import java.util.TreeMap;
import org.apache.commons.codec.digest.MurmurHash3;
import java.util.Map;

public class ConsistentHashing {
TreeMap<Integer, String> keyValueMap = new TreeMap<Integer, String>();

public int getHash(String value) {
// Using murmurhash3 for non-cryptographic hash function
return MurmurHash3.hash32x86(value.getBytes());
}

public void insert(String value) {
int key = getHash(value);
synchronized(this) {
if (!keyValueMap.containsKey(key)) {
keyValueMap.put(key, value);
}
}
}

public int getNextKey(String value, boolean greater) {
// when greater is true it implies find the next server
// with a strictly next 'greater' hash.

int key = getHash(value);

synchronized(this) {
if (!greater && keyValueMap.containsKey(key)) {
return key;
}

Map.Entry<Integer, String> entry = keyValueMap.higherEntry(key);

if (entry == null) {
return keyValueMap.firstEntry().getKey();
}

return keyValueMap.higherEntry(key).getKey();
}
}

public String getNext(String value, boolean greater) {
int key = getNextKey(value, greater);
synchronized(this) {
return keyValueMap.get(key);
}
}

public void delete(String value) {
int key = getHash(value);
synchronized(this) {
if (keyValueMap.containsKey(key)) {
keyValueMap.remove(key);
}
}
}

public void clear() {
synchronized(this) {
keyValueMap.clear();
}
}

public boolean exists(String value) {
int key = getHash(value);
synchronized(this) {
return keyValueMap.containsKey(key);
}
}
}

We are using TreeMap data structure in the Java collections library for maintaining the ordering of keys and servers in the ring.

For hashing, we are using the Murmurhash3 library from apache.commons maven dependency.

For continuously monitoring any partitions added to the cluster, we can use the following code:

private ConsistentHashing partitioner = new ConsistentHashing();

public void getPartitions() {
try {
String path = "/partitions";

zkeeper.getChildren(
path,
new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
getPartitions();
}
}
},
new ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch(Code.get(rc)) {
case OK:
for (String partitionId : children) {
partitioner.insert(partitionId);
}
break;

case CONNECTIONLOSS:
getPartitions();
break;

default:
break;
}
}
}
);
} catch (Exception e) {
e.printStackTrace();
}
}

Check Leader Exists

Check if leader for own partition exists, if leader does not exists then run leader election algorithm:

public void leaderExists(int partitionId) {
try {
String path = "/partitions/" + partitionId + "/leader";

zkeeper.exists(
path,
new Watcher() {
@Override
public void process(WatchedEvent event) {
// Run for leader if current leader dies
if (event.getType() == EventType.NodeDeleted) {
runForLeader(partitionId);
}
}
},
new StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch(Code.get(rc)) {
case NONODE:
// No leader yet
runForLeader(partitionId);
break;

case CONNECTIONLOSS:
// Network issues
leaderExists(partitionId);
break;

case NODEEXISTS:
// Leader already elected, continue to monitor
leaderExists(partitionId);
break;

default:
break;
}
}
}
);
} catch (Exception e) {
e.printStackTrace();
}
}

The watcher above checks if the znode:
/partitions/partitionId/leader got deleted which means that the leader node is dead and its time for a new election.

The leader election algorithm:

public void runForLeader(int partitionId) {
try {
int maxSeq = Integer.MIN_VALUE;
String leader = null;

for (String replica : logSeq.keySet()) {
int seqs = logSeq.get(replica);

if ((seqs > maxSeq)
|| (seqs == maxSeq
&& leader != null
&& replica.compareTo(leader) < 0)) {

maxSeq = seqs;
leader = replica;
}
}

if (leader != null && leader.equals(hostPort)) {
zkmanager.createAsync(
"/partitions/" + partitionId + "/leader",
leader.getBytes(),
new StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch(Code.get(rc)) {
case CONNECTIONLOSS:
runForLeader(partitionId);
break;

case OK:
isLeader = true;
break;

case NODEEXISTS:
leaderExists();
break;

default:
break;
}
}
},
false,
false);
}
else {
leaderExists();
}
} catch (Exception e) {
e.printStackTrace();
}
}

The leader election algorithm works as follows:

  1. Each server in the partition, fetches the list of replicas in the partition.
  2. For each replica, get the latest log sequence number in its commit log. Higher log sequence implies the log is more updated.
  3. The replica with the highest log sequence is chosen to be the leader.
  4. In case 2 or more replicas have the same log sequence, the replica address which comes lexicographically first, is chosen to be the leader.
  5. The replica which should be the leader, creates the znode for the leader asynchronously: /partitions/partitionId/leader
  6. The data for the leader znode is the server address.
  7. Note that all replicas which are not the leader will not try to create the leader znode.
    This is important because the leader znode is an ephemeral znode and thus it is expected that if the leader node which created this znode dies, then this znode also gets deleted and re-election happens.
  8. For all other replicas which lost the election they will continue to monitor the leader znode.
  9. Also, in the edge case where 2 replicas found out that both are winners and try to create the leader znode, only one of them will succeed.
    The other probable leader which lost, will then continue to monitor the leader znode.

Note that whenever a replica becomes a leader, it does not need to monitor if the leader exists or not. Because till the time it is alive it will be the leader and if it dies and restarts again, the watcher will re-initialize.

Can there be 2 or more different leaders elected?

Replica A and B have both sequence number X but lexicographically A comes before B hence A thinks it is the winner, but while A is trying to create the leader znode, B’s sequence got updated to X+1, but A’s got delayed and its still X. Now B will find that it is the leader and try to create leader znode.

Here is a time ordered sequence of events:

Time T0 - Current leader dies deleting the leader znode

Time T1 - A detects leader do not exists, reads the replicas
and the log sequence numbers.

Time T2 - A finds that A & B has the highest sequence number X but
A is smaller than B lexicographically.

Time T3 - A tries to create the leader node but there is some network delay

Time T4 - B's sequence got updated to X+1

Time T5 - A's sequence got updated to X+1

Time T6 - Since there is no leader yet, B finds it has the highest sequence
number X+1, thus tries to create leader node.

But as we saw above only one of A or B will successfully create the leader znode. Thus after election we will have exactly one leader.

Different leader election strategies?

This was one of the various strategies by which we can elect a leader for a partition. Some other strategies could be:

  1. First come first server basis i.e. each replica tries to create the ephemeral znode. The first replica that successfully creates the znode becomes the leader.
  2. Sequential znodes. Each replica creates a sequential ephemeral znode under /partitions/partitionId/election. For e.g. if there are 3 replicas with addresses ip_1, ip_2 and ip_3, then the following znodes could be created:
    /partitions/partitionId/election/ip_1_000002
    /partitions/partitionId/election/ip_2_000001
    /partitions/partitionId/election/ip_3_000003
    In this case, the znode with the lowest sequential number i.e. /partitions/partitionId/election/ip_2_000001 will become the leader.

The drawbacks with the above approaches is that it does not take into account which replica has the most updated logs. Thus when a non-updated replica becomes the leader, if all reads happen from leader, then some key-value entries could be missing.

Reconcile Keys from Next Partition

If the new server is a leader for its partition, then reconcile all keys from its “next” partition leader in the consistent hashing ring.

When a new partition is created, all keys that are currently allocated to the next partition in the ring (clockwise direction), that comes before the new partition (anti-clockwise direction), needs to re-assigned to the new partition and deleted from the old partition.

public static void reconcileKeys() {
try {
if (isLeader) {
// Get next partition leader
String partition = partitioner.getNext(partitionId, true);

if (!partition.equals(partitionId)) {
String nextNode = getLeaderForPartition(partition);

JSONObject jsonObj = new JSONObject();
UUID uuid = UUID.randomUUID();

jsonObj.put("operator", "RECONCILE-KEYS");
jsonObj.put("request_id", uuid.toString());

JSONObject dataObj = new JSONObject();
dataObj.put("partition", partitionId);
jsonObj.put("data", dataObj);

jsonObj.put("request_type", 0);
jsonObj.put("timestamp", System.currentTimeMillis());

sendMessage(jsonObj.toString() + "<EOM>", nextNode);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

The reconcile steps also needs to happen periodically because if the next partition is also new and it is in the process of syncing the keys from their next partition, all relevant keys will not be present in the next partition and thus it has to poll it periodically.

Time T1 - Keys K1 and K2 are assigned to partition A

Time T2 - Partition B joins, hash ordering : K2 --> K1 --> B --> A

Time T3 - B asks A for reconcile keys

Time T4 - K1 is reconciled from A to B

Time T5 - Partition C joins, hash ordering : K2 --> C --> K1 --> B --> A

Time T6 - C asks B for reconcile keys, but K2 has not synced to B yet.

Time T7 - No keys reconciled from B to C.

Time T8 - K2 is reconciled from A to B

Time T9 - C asks B for reconcile keys (again)

Time T10 - K2 is reconciled from B to C

The serialized message format used in this example service is very simple:

{
"operator" : "RECONCILE-KEYS", // kind of operation
"request_id" : "abc123", // unique id across all requests
"data" : {"partition" : "127.0.0.1:5004"}, // data to send
"request_type" : 0, // 0 for request and 1 for response
"timestamp" : 1690974252698 // current timestamp in ms
}

On receiving the above message, the next partition (in clockwise order) leader will then act upon the message i.e. will get the keys from its own hash map for which the next hash in clockwise direction equals the hash of the new partition.

Then it will send those keys to the new partition and at the same time deleting those keys from its logs and key value store (and also replicating those changes to its replicas).

Deleting the keys from log would be equivalent to to adding new log entries for deletion operation.

public static void reconcileWithClient(String partition, SocketChannel client) {
try {
int nodeHash = partitioner.getHash(partition);

Set<String> keys = myMap.getKeys();
Set<String> toDelete = new HashSet<String>();

// Filter keys whose next hash is equal to the new partition hash
for (String k : keys) {
if (partitioner.getNextKey(k, false) == nodeHash) {
toDelete.add(k);
}
}

String putResponse = "";

for (String s : toDelete) {
JSONObject jsonObj = new JSONObject();

UUID uuid = UUID.randomUUID();
String val = myMap.get(s);
long ts = myMap.getTimestamp(s);

JSONObject dataObj = new JSONObject();
dataObj.put("key", s);
dataObj.put("val", val);
jsonObj.put("data", dataObj);

jsonObj.put("operator", "PUT");
jsonObj.put("request_id", uuid.toString());
jsonObj.put("request_type", 0);
jsonObj.put("timestamp", ts);

putResponse += jsonObj.toString() + "<EOM>";

// Create deletion request for self as well as own replicas
jsonObj = new JSONObject();

uuid = UUID.randomUUID();

dataObj = new JSONObject();
dataObj.put("key", s);

jsonObj.put("operator", "DEL");
jsonObj.put("request_id", uuid.toString());
jsonObj.put("data", dataObj);
jsonObj.put("request_type", 0);
jsonObj.put("timestamp", System.currentTimeMillis());

String req = jsonObj.toString();

handleDelRequest(req);
}

sendMessage(putResponse, client);

} catch (Exception e) {
e.printStackTrace();
}
}

What will happen when GET requests for keys comes which has still not been re-assigned to the new partition?

It is a possible scenario where when a new partition joins the cluster, during the interval when the keys are still syncing from its next partition, a GET query for the same key might be sent by the client.

Thus the query will now come to the new partition, but the corresponding key has yet not synced and thus the leader node will see that the key is not present.

One strategy is to wait for few seconds and retry but it is not guaranteed that by that time the key would have synced.

Another strategy could be to forward the request to the next partition leader and get the value and forward to the client.

Thus now for every GET request, if the key is not present, the partition leader will have to ask its next partition leader to check if it is there or not.

But as you can see that the problem escalates if multiple new partitions joins the cluster and it could be possible that the next partition is also new and it is also syncing, in that case it has to forward to its next to next partition and so on.

Replicate from leader to replicas

Replication of logs from leader to replicas takes place inside a separate thread and occurs after every few intervals.

Thus we do not need to explicitly send the replication messages to the replicas.

public static void replicate() {
try {
while(true) {
// Replicate from leader only
if (isLeader) {
for (String replica : replicas) {
if (!replica.equals(hostPort)) {
int seq = -1;
seq = logSeq.get(replica);

// Get all the logs to send to replica based on
// how updated the replica is.
List<String> logsToSend = getLogs(seq+1);

String msg = "";
int s = seq+1;

for (String log : logsToSend) {
JSONObject jObj = new JSONObject();
jObj.put("operator", "REPLICATE");

JSONObject dataObj = new JSONObject();

// Send sequence number of leader log to replica
dataObj.put("sequence", s);
dataObj.put("data", log);

jObj.put("data", dataObj);

UUID uuid = UUID.randomUUID();

jObj.put("request_id", uuid.toString());
jObj.put("request_type", 0);
jObj.put("timestamp", System.currentTimeMillis());

msg += jObj.toString() + "<EOM>";
s += 1;
}

sendMessage(msg, replica);
}
}
}

// Repeat replication every 1 second
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
}
}

During replication, we are also sending the sequence number of the message in the leader’s log.

It can happen for a previous log, that a replica updates the log and when it tries to update the log sequence in zookeeper, it fails, thus we have a state where log is updated but the sequence number is not. Hence the leader will mistakenly re-send logs which has already been applied.

Thus the replica can check before applying the new logs, whether the sequence number sent by the leader is +1 of its own sequence number, if true then it applies the log else does not apply.

Implementing the server to handle incoming requests

We implement the server using socket selector (similar to select and epoll in C/C++) i.e. the selector will monitor the channels/sockets for either new connections or incoming data and based on that take necessary actions.

Details of using socker selectors can be found in some of my earlier posts:

public static void runServer(int port) {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();

serverSocket.bind(new InetSocketAddress("localhost", port));

// Sockets in selector needs to be non-blocking
serverSocket.configureBlocking(false);

// Register the server with selector. The server will only ACCEPT
serverSocket.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
selector.select();

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();

// Iterate only over those sockets which has new connection or
// is ready to send data.
while (iter.hasNext()) {
SelectionKey key = iter.next();

// New client connection
if (key.isAcceptable()) {
SocketChannel client = serverSocket.accept();

// Register client socket with selector
register(selector, client);
}

// Data sent over existing client socket
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();

// Parse messages by reading from the buffer
List<String> msgs = getMessages(client);
for (String msg : msgs) {
handleRequest(msg, client);
}
}

iter.remove();
}
}
}

public static void register(Selector selector, SocketChannel client)
throws IOException {
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}

I will not get into the details of how the server handles each type of request in this post. The complete code can be found in the repository.

funktor/zookeeper-kv-store: A distributed key-value store using Zookeeper for coordination (github.com)

Would request to fork the repository and play around it to find understand or find any issues in implementations.

References

  1. MIT 6.824: Lecture 8 — ZooKeeper (timilearning.com)
  2. pdos.csail.mit.edu/6.824/papers/zookeeper.pdf
  3. ZooKeeper Programmer’s Guide (apache.org)
  4. How To Install and Configure an Apache ZooKeeper Cluster on Ubuntu 18.04 | DigitalOcean
  5. zab.pdf (marcoserafini.github.io)

--

--