# Deep dive into Chord for Distributed Hash Table

In a distributed hash table, a hash table with N keys is partitioned into M partitions or nodes. This is done in order to handle large number of keys and queries that exceeds the limitations of a single machine CPU and RAM.

For e.g. if the size of a hash table is 10TB, and a single machine can only handle 10GB of in-memory data and 100K requests per second, then to handle say 5 million RPS, we would need approximately:

**max(10TB/10GB, 5million/100K) = max(1024, 50) = 1024 machines.**

Although there are several challenges going from a single machine to a distributed setting with M machines, for this post we will concern ourselves with the following problems:

- When a client issues a PUT or a GET command, how the request is routed to the correct machine/partition efficiently?
- How to ensure high availability in the face of some machines not responding?

**Keys are assigned to machines using the Consistent Hashing protocol.**

Let H be some hash function that generates hash values uniformly over the range [0, P). For each machine, we compute its hash using its machine id Q.

Machine hash = H(Q)

Similarly for each key, K, we compute its hash using the same hash function H i.e. H(K).

K is assigned to Q, if H(Q) is the ‘next higher’ hash of H(K) among all possible Q’s i.e. the smallest machine hash greater than equal to H(K).

Although the definition above is loose when we are talking about a ring like architecture. More accurate would be to say that H(Q) is the next hash in the clockwise direction from H(K).

## What are some routing options in the consistent hashing ring?

When a client sends a PUT or a GET request, the request will come to one of the machines (most likely the one closest to the client). The key K in the request is either intended for the machine or its not.

If its intended for that machine (with a

probability of 1/M), then it can update its own hash table (if its a PUT) or retrieve the value for the key (if its a GET) and send back to client.But the probability, that it is not

intended for itself is (M-1)/M, which is much greater. In such a case how should the machine forward the request to the correct machine?

One possible solution is to store an entry table at each machine with the **hash of each machine and the corresponding endpoint** (to forward the request). The endpoint could be a **HTTP(s) URL, a TCP socket** etc.

The machine can then do a **lookup in O(M) time** if the entries are **not sorted by the hash** value or in **O(logM) time if they are sorted by hash** values.

Then forward the request to the endpoint whose hash value comes

next in clockwise direction from K, in the consistent hashing ring.

`def next_machine(entry_table, inp):`

min_d = 1 << 32

ep = None

for hash, endpoint in entry_table:

# Find the closest machine to inp in clockwise direction

# from inp

if inp <= hash and (hash-inp) < min_d:

min_d = min(min_d, hash-inp)

ep = endpoint

# input hash could be higher than maximum hash among all machines

# which implies that it should go to the 1st machine because its

# a ring

if ep is None:

ep = entry_table[0].endpoint

return ep

# Using binary search on sorted hashes

def next_machine(sorted_entry_table, inp):

left, right = 0, len(sorted_entry_table)-1

ep = sorted_entry_table[0].endpoint

while left <= right:

mid = (left+right)/2

if sorted_entry_table[mid].hash >= inp:

ep = sorted_entry_table[mid].endpoint

right = mid-1

else:

left = mid+1

return ep

The

space complexityof storing the entry table isO(M). But thehop complexityto route to the correct machine isO(1)because the key reaches its intended machine is either1 step or 2 steps.

Also whenever a new machine joins, it has to inform all existing machines in the ring, which **incurs a cost of O(M) messages**.

**Another approach could be to only store an entry table at each machine with only the next and previous machines in the ring.**

In this setting, whenever a key K is not intended for the machine it will forward the key to its successor machine and so on until it reaches the correct machine.

The

space complexityof storing the entry table isO(1). But thehop complexityto route to the correct machine isO(M)because in the worst case the key has to traverse M-1 machines before reaching the correct one.

Whenever a new machine joins, it has to inform only its successor and predecessor i.e. **O(1) messages during joins**.

One might argue that the former approach is better because the cost of network I/O in the 2nd approach for each PUT or GET request offsets any advantage gained using a smaller entry table since in most real life applications M could be well below 1000.

Also new machines join an existing cluster not as frequently as PUT or GET requests come.

But we can also find a middle ground where we need to a store an entry table of size

O(logM)and use onlyO(logM)forwards to reach the correct machine. This is what Chord does.

Instead of each machine storing the entire entry table, it stores O(logM) entries. The entries are as follows:

If the machine has a hash value of H(Q), then each entry i is of the form: (H(Q)+2^i)%P, where i ranges from 0 to log(P)-1, where P is the maximum value.

**Let us denote it by F(Q,i) = (H(Q)+2^i)%P**

These values are used as keys in the entry table, the value corresponding to the i-th key is the

machine hash whose hash is the next in the clockwise direction from F(Q,i).

For e.g. the hashes of 4 machines in increasing order are as follows: 1, 3, 15 and 24 for P=32.

Then for the machine with hash = 3, the key entries are F(3,0)=(3+1)%32, F(3,1)=(3+2)%32, F(3,2)=(3+4)%32, F(3,3)=(3+8)%32, F(3,4)=(3+16)%32

4, 5, 7, 11, and 19

These are the hashes of the entry table. The successor corresponding to 4 would be the machine hash next in clockwise direction from 4 which in this case is 15.

Similarly for entry 5, 7 and 11, the successor is again 15. For 19 it is 24.

If a request comes to the machine with hash = 3, with key whose hash value is H(K)=28, then the machine looks up the entry table, and finds the hash in the entry table whose value is next in anti-clockwise direction from H(K).

In this example, that hash would be 19.

The successor corresponding to hash 19 is the 3rd machine with hash = 24. Thus the key is then forwarded to the machine whose is hash=24.

But note that the key with hash=28 should not be allocated to machine with hash=24. Hence we repeat the step again at machine hash=24. This time we find the hash which is next in clockwise direction is 28 which equals H(K).

The successor corresponding to hash 28 is the machine with hash=1. Thus the key is then forwarded to the machine with hash=1.

There are some edge cases to consider though:

What if the hash of key is smaller than the smallest hash?i.e. what if the hash of key is 2 when the smallest hash in entry table is 4.In this case it

maps to the largest hashi.e. 19 because note that the hashes are arranged in a ring and if you go anti-clockwise from hash = 2, you will find hash=19 the first entry hash.

## How to decide whether the key has landed up at the correct machine?

If we know what is the hash of the previous machine in the ring and if the hash of key lies in between the hash of the previous machine and the current machine, then the key belongs to the current machine.

Let us define a function dist(a, b), that takes as inputs 2 hashes a and b and returns the clockwise distance from a to b in the ring of size N:

`def dist(a, b):`

if a <= b:

return b-a

return N+b-a

Let H(Q) be the hash of the current machine and H(Q_prev) be the hash of Q’s previous machine, then key K belongs to Q if:

`dist(H(K), H(Q)) < dist(H(K), H(Q_prev))`

This inequality holds true not only for Q_prev but for any other machine if K belongs to machine Q.

Thus we need to maintain a predecessor pointer to the previous machine in the consistent hashing ring.

## How to prove that the key will eventually land up at the correct machine?

Let the key lands up at a machine with hash=x whereas the actual hash of the machine where it should go is hash=y. Let the number of hops from x to y in the clockwise direction in the ring be Z.

Remember that, we can write any positive integer as sum of powers of 2. For e.g. if Z=20, then we can write it as 16+4 or Z=15, we can write it as 8+4+2+1. **The number of terms to add is equal to the number of 1 bits in the binary representation of Z.**

Binary(Z=20)=10100, Binary(Z=15)=1111

Thus starting from hash=x, find the ‘

highest power of 2 hop’ that will take itclosestto hash=y, this corresponds to the 1st 1 in the binary representation. At whichever hash it lands next, repeat the same until eventually it will land at the correct machine.

`def find_successor(self, inp):`

if (dist(inp, self.hash) < dist(inp, self.predecessor.hash)):

return self

else:

ep = next_machine(self.entry_table, inp)

return find_successor(ep, inp)

## What happens when a new machine gets added?

Let a new machine Q’ with hash=H(Q’) is added.

Then for all the keys K which were previously allocated to its successor machine H(Q’_succ) in the circle but now we have:

`dist(H(K), H(Q')) < dist(H(K), H(Q'_succ))`

These keys K has to be allocated to Q’ and deleted from Q’_succ.

`def get_keys_from_successor(self):`

kv_pairs = fetch_keys(self.successor, self)

for k, v in kv_pairs:

insert_into_hash_table(k, v)

def fetch_keys(self, machine):

keys_to_delete = []

output = []

for k, v in self.hash_table.items():

key_hash = hash(k)

# Check which keys should be transferred

if dist(key_hash, machine.hash) < dist(key_hash, self.hash):

keys_to_delete += [k]

output += [(k, v)]

# Delete the keys which are to be transferred

for k in keys_to_delete:

self.hash_table.remove(k)

return output

Thus we need to keep a successor pointer at each machine that will point to the next machine in the clockwise direction in the ring.

**This takes care of the keys, but what about the entry table of Q’, how it is going to be updated?**

For each of the hashes, F(Q’, i), repeat similar steps as when we are trying to send a PUT or a GET request with a key K. Find the machine where the hash F(Q’, i) should land as if it is a key.

The successor corresponding to the hash F(Q’, i) is the hash of the machine where the hash lands up eventually.

Making network I/O for all the log(P) entries can be time taking and meanwhile Q’ will have incorrect entries which will slow down both PUT and GET requests.

One optimization that can be used here is as follows:

If for some i, let the successor corresponding to F(Q’, i) be V, then for i+1, if F(Q’, i+1) is located in between F(Q’, i) and V, then the successor for F(Q’, i+1) is also V and we do not have to make another network call to fetch those.

`def create_entry_table(self):`

prev_hash = -1

prev_succ = -1

for i in range(math.log(P)):

curr_hash= (self.hash + (1 << i))%P

if (prev_hash != -1 and \

dist(curr_hash, prev_succ) < dist(prev_hash, prev_succ)):

s = prev_succ

else:

# need to make request to a random machine as this machine

# has not joined yet and acts as a client

s = find_successor(random_machine, curr_hash)

self.entry_table[i].hash = curr_hash

self.entry_table[i].successor = s.hash

self.entry_table[i].endpoint = s.endpoint

prev_hash = curr_hash

prev_succ = s

**But what about the entry tables for the other machines, how are they going to get updated when Q’ is added?**

Observe that not all machines will have Q’ added in their entry tables. For e.g. for i=3, only the machines located at-least

(H(Q’)-2³)%P distance in anti-clockwise directionfrom Q’ can possibly have Q’ for its entry i=3.

For e.g. assume that H(Q’)=100, then which all machines Q will have Q’ in its entry table for the i=2-th entry?

For Q’ to be a successor of F(Q, 2), it must satisfy F(Q, 2) ≤ 100.

`H(Q)+4 ≤ 100, H(Q) <= 96`

Consider the machine with hash=97. What will be its F(Q, 2) value? It will be 97+2²=101 which is greater than 100. Thus 100 can never be successor of F(Q, 2) for machine Q. Similarly for machines with hash=98 and hash=99 which have hashes > 100.

But for all machines with hash ≤ 96, F(Q, 2) will be ≤ 100. Thus the machine Q’ with hash=100 is a potential successor of all machines with hash ≤ 96.

Similarly, Q’ is a potential successor of F(Q, 3) for all machines Q with hash ≤ 92 and so on.

`def synchronize_entry_table(self):`

for i in range(math.log(P)):

hash = (self.hash - (1 << i)) % P

# Get machine corresponding to hash

s = find_successor(random_machine, hash)

while True:

# Update entry table of machine and its predecessors

updated = update_entry_table(s, self, i)

if updated is False:

break

s = s.predecessor

def update_entry_table(self, machine, i):

hash = (self.hash + (1 << i)) % P

for j in range(len(entry_table)):

if entry_table[j].hash == hash:

# Update successor if new successor is located 'before'

if (dist(hash, machine.hash) < \

dist(hash, entry_table[j].successor)):

entry_table[j].successor = machine.hash

entry_table[j].endpoint = machine.endpoint

return True

return False

Apart from updating the entry tables, the predecessor and successor pointers also needs to be updated.

When a new machine is added, its predecessor is the predecessor of its successor and the successor of its predecessor is itself. The predecessor of its successor is updated to itself.

`def connect_predecessor(self):`

pred = get_predecessor(self.successor)

self.predecessor = pred

set_predecessor(pred, self)

def set_predecessor(self, machine):

self.successor = machine

def update_successor(self):

set_successor(self.successor, self)

def set_successor(self, machine):

self.predecessor = machine

Here is the full algorithm, when a new machine is added to the consistent hashing ring:

`def join(machine_id, endpoint):`

self.hash = hash(machine_id)

self.endpoint = endpoint

# Find successor

succ = find_successor(random_machine, self.hash)

self.successor = succ

# Update predecessor

connect_predecessor(self)

# Update successor

update_successor(self)

# Update own entry table

create_entry_table(self)

# Update entry tables of other machines

synchronize_entry_table(self)

# Get eligible keys from successor

get_keys_from_successor(self)

## What happens when a machine is not responding?

All PUT and GET requests for keys that are allocated to the unresponsive machine will either timeout or throw an exception.

But that does not mean that any request intended for a different machine, will also not work properly.

From our earlier example, let us assume that the machine with hash=24 is unresponsive.

When the request for key K with H(K)=28, comes to machine with hash=3, it has to forward the request to the machine with hash=24. But since it is not responding, **it can instead forward the request to its successor** i.e. hash=15.

At hash=15, the hash values and their successors are as follows:

`Hash Successor`

16 24

17 24

19 24

23 24

31 1

Again this machine finds out that it has to forward the request to hash=24, because the first hash lower than H(K)=28 is 23 which has its successor hash=24.

But note that now the successor of 15 is also hash=24 so this machine can neither forward to the correct machine from the entry table nor its successor because both are unresponsive.

**The solution here is to use R successors instead of a single successor. Generally R=log(P).**

i.e. each machine stores R=log(P) successors.

Thus even if a successor is not responding, it still has R-1 options left to forward that request to.

The updated find_successor code will return itself as well as its R successors.

successor_list is a sorted set (list) like data structure that is ordered based on how far the successor is from itself.

`def find_successor(self, inp):`

if (dist(inp, self.hash) < dist(inp, self.predecessor.hash)):

return self + self.successor_list

else:

ep = next_machine(self.entry_table, inp)

return find_successor(ep, inp)

In the join method, the new machine will fetch its successor along with its successor’s successors. After this, the machine can update its own successor_list which is a sorted set and limit its size to log(P).

`def join(machine_id, endpoint):`

self.hash = hash(machine_id)

self.endpoint = endpoint

# Find successor

succ = find_successor(random_machine, self.hash)

self.successor = succ[0]

# Update successor list

for i in range(len(succ)):

self.successor_list.insert(succ[i])

# If size of successor list exceeds log(P), remove the last

# successor which by definition should be the farthest from self

if len(self.successor_list) > math.log(P):

self.successor_list.pop()

Thus in the presence of a malfunctioned machine among M machines, only 1/M keys will have trouble getting a response, remaining (M-1)/M fraction of keys will get correct response.

**For the remaining keys, one can continue to retry until the malfunctioned machine is again up and running.**

In general to guarantee high availability i.e. make sure that all GET and PUT requests return correct responses, we need to use **replication **for each partition. i.e. each machine in the pool will have additional 2 more replicas which will have the same data.

Thus even if one of the replica is unresponsive, request can be sent to 2 other replicas and later when the unresponsive replica is back again, it will sync itself with the other replicas using a

consensus protocol like Raft or Paxos.

There is one more issue when a machine joins a consistent hashing ring.

In the above diagram, the machine with hash=25 joins after the machine with hash=20 joins.

So after hash=20 joins, assuming successor list contains 3 successor machines, it will have its successor_list as [24, 1, 3].

But when hash=25 joins, the successor of hash=24 will be updated but its successor list which earlier had [1, 3, 15] will now have to be updated to [25, 1, 3] and similarly the successor list of hash=20 will have to be updated to [24, 25, 1].

The updations can be done by the new machine when it joins by sending its successor list around the ring or each machine can be updated periodically using stabilization messages.

The problem with one time propagation is that if some machine becomes unresponsive while propagation is taking place, then either the propagation will stop and timeout (if only single successor or predecessor) or if it propagates by going ‘around’ the unresponsive machine and again when the unresponsive machine is back, some machines will have incorrect successors.

**Thus doing perioding stabilization is a preferred way.**

`def stabilization(self):`

timeout = 0

while True:

if time.time() > timeout:

# Find successor

succ = find_successor(self.successor, self.hash)

self.successor = succ[0]

# Update successor list

for i in range(len(succ)):

self.successor_list.insert(succ[i])

# If size of successor list exceeds log(P), remove the last

# successor which by definition should be the farthest

# from self

if len(self.successor_list) > math.log(P):

self.successor_list.pop()

# Update own entry table

create_entry_table(self)

# Reset timeout

timeout = time.time() + random.randint(1, 10)

## What happens when a machine is not fully updated but it starts to receive PUT and GET requests?

When a machine joins, it follows a protocol where it finds its successors, updates predecessors, updates own entry table, updates other machine’s entry tables and reconciles keys from its successor.

All of this can take a while to complete especially if the network is slow.

Should the machine accept client requests all the while doing the above operations? and if yes, how will that impact the correctness and performance of the distributed hash table?

If the successor and predecessor of the new machine is setup, it can accept new client requests.

- If its
**entry table is not updated**, it can**forward the client request to its successor**which will in turn route the key to its correct machine. - If the
**entry table is partially updated**i.e. say when the machine 25 (above picture) joins and machine 15 is unresponsive, then the successor for the entry (25+2⁴)%32 = 9 is 20 which should have been 15, but 15 is unresponsive.

Although this entry is incorrect, but note that this should not lead to incorrect response i.e.**keys will be correctly routed although it might take additional hops**.

During stabilization, when machine 15 is again running, the**entry table of 25 will be updated and routing will again happen through 15**. - Earlier we have seen that when a machine becomes unresponsive, the system can still send the correct response provided the key is not intended for the unresponsive machine.
- Points 3 and 4 above holds true even when
**multiple machines are added simulataneously or multiple machines fails simultaneously**.

**In gist, the chord ring is self-healing.**

Here is C++ implementation I wrote from scratch that uses sockets for communication: funktor/chord-dht: Implementation of chord for distributed hash table (github.com)