source: quantamagazine

Building a simple distributed hash table using Python and Sockets — Part 2

Abhijit Mondal
13 min readOct 31, 2022

--

In the last part, we built a “Happy Case” distributed hash table which can distribute all set and get requests across N instances (almost equally) thus allowing us to use N times the more RAM as compared to a single instance hash table. But there were several edge case scenarios which the last implementation could not handle.

In this post, we are going to solve some of those problems. Let’s look at the first problem:

What if one of the partition servers goes down?

In this scenario, all keys sent over to this server will be lost and as a result 1/N fraction of the keys from our hashtable will be missing. One possible solution is to use replicas for each partition.

Let’s define a replica set— A replica set in our context of distributed hash table is a set of servers or nodes holding the same set of keys.

A partition consists of a leader and a replica set.

Even if one or ‘few’ replicas from a partition goes down, our distributed hash table should work without interruptions. This is one of the goals of a distributed system: There should not be any single point of failure.

There exists at-least 3 different popular strategies:

  1. Asynchronous — ‘set’ operations are forwarded to the leader responsible for the key. The leader updates its hashtable and also asynchronously broadcasts the set operation to all the replicas.
    But the client need not wait for all the replicas to have been updated.
    During get operation, forward the request to one of the replicas.
    But there is no guarantee that we will get the latest value for a key since asynchronous updates might have not completed yet for some replicas.
  2. Synchronous —The leader updates its hashtable and synchronously broadcasts the set operation to all the replicas. The request ensures that all the servers in the partition are sucessfully updated before returning to the client and accepting any new read requests.
    During get operation we forward the request to one of the replica.
    Here we are guaranteed to get latest value for a key.
  3. Semi-synchronous —During set operation, the request ensures that atleast (N+1)/2 servers are sucessfully updated before returning to the client (assuming N is odd).
    If latest value is desired then during get operation, we query any (N+1)/2 servers and based on latest request id we choose which value to return. If older value is ok then we can query any replica.

Note that the leader is a single point of failure although we can mitigate that using leader election algorithms. Another possibility is using multiple leaders (across multiple regions).

The latest value or strong consistency guarantee for synchronous replication may not hold true when we have multiple leaders across regions. Because there is no single source of truth.

But let’s assume for now we only have a single leader.

The code for the core hashtable can be found here:

distributed-hash-table/hashtable.py at main · funktor/distributed-hash-table (github.com)

For simplicity, let’s use the fully synchronous replication for our DHT.

If you have seen the code from the previous part, in our latest version of the hashtable_service we are adding replication. For this we did few modifications to the code:

  1. The partitions command line parameter is an array of array instead of just an array. Each array is a partition with the first one being the leader. For e.g. if we have a partition A of 3 machines: ‘127.0.0.1:5001’, ‘127.0.0.1:5002’ and ‘127.0.0.1:5003’ and partition B of 3 machines: ‘127.0.0.1:5004’, ‘127.0.0.1:5005’ and ‘127.0.0.1:5006’, then the partitions input would look like:
    “[[‘127.0.0.1:5001’, ‘127.0.0.1:5002’, ‘127.0.0.1:5003’], [‘127.0.0.1:5004’, ‘127.0.0.1:5005’ and ‘127.0.0.1:5006’]]”
  2. During initialization, we assign the cluster index by scanning through the partitions array. Also the service in the 1st index is assigned to be the leader. In the above example, ‘127.0.0.1:5001’ and ‘127.0.0.1:5004’ are the leaders of the corresponding partitions.
  3. When a service receives a set request for a key. It first checks if the key is intended for its own partition. If it is, then it sets the key in the hashtable. Also if it is the leader, then it broadcasts the set operation to all its replicas. If the key is not intended for its partition, then it is forwarded to the leader of the partition for which it was intended for.
  4. When a service receives a get request for a key. It first checks if the key is intended for its own partition. If it is then it gets the value for the key from hashtable. If the key is not intended for its partition, then select a random replica from the intended partition and forward it to that replica.
  5. During broadcast of write to replicas, we wait for the replica servers to get started and then send the message. Also we need to wait indefinitely if a replica server is not responding or has crashed because we are doing synchronous replication.
  6. During forward of read request, if the server where request is forwarded is not responding or has crashed, we try a different random server after a timeout interval.
  7. During broadcast of write to replicas we use a blocking queue to capture the responses as we are sending the requests in parallel threads.
  8. We are using multithreaded Locks for sockets because since we are re-using sockets for sending and receiving messages, if 2 or more parallel threads are sending message to the same socket, then there is no guarantee that the responses will arrive in the same order. Thus we ensure sending and receiving messages through sockets are sequential.

We can do away with the locks on sockets, but we need something like a message queue to queue up the responses along with their message ids.

To create 2 partitions with 1 leader and 1 replica each, we use the following shell script to start 4 terminal windows corresponding to the 4 services. Also we start 1 client application to generate the queries.

Play around in the ‘client service’ terminal window by sending set and get requests. Observe that when a leader receives a set request, the corresponding replica also receives it afterwards. Add more partitions and replicas to have fun !!!

The client sends “set” requests with timestamp as request_id and then we send “get” request for same key and check if the output request id is equal to the set request id then the value of get should be equal to the value sent in set request id.

Although there could be situations where this test fails — Imagine 2 clients A and B sending “set” request for the same key (different values) with the same timestamp. If A value is inserted before B, then B value will be skipped. Now when B is requesting the key using “get”, it receives the value set by A but the request_id is same (since it was the same timestamp).

We can mitigate this by appending an unique integer id (hash of IP:PORT) after timestamp. In this case, the client with the highest id will always win in case of tie on timestamp.

One of the drawbacks of the above hashtable service implementation is :

What if we decided to add a new replica to a partition or add a new partition?

One option is to stop all the services, update the ‘partitions’ command line argument and restart them. Addition of new partition dynamically has another set of issues which we will see later in this post.

But it might not be a feasible solution if availability is our prime concern.

Let’s modify our code to handle addition of replicas dynamically.

The code above shows the incremental changes from the previous version so you can see that some of the common modules has been omitted here (in order to keep the code shorter).

We made a few changes:

  1. Added a lock to the objects holding partitions and replicas information because now these objects can be read and write at the same time by different threads.
  2. During startup, a replica service sends JOIN message to all leaders because leaders will either forward request to the replica during get request or broadcast set request.
  3. Once the leaders receives the JOIN message from a replica, it updates its copy of the partition object.

To test that a new replica added dynamically receives set and get requests, we start a new replica service to the leader ‘127.0.0.1:5003’

start "hash table 5" python "ht_service.py" "127.0.0.1" "5005" "[['127.0.0.1:5001', '127.0.0.1:5002'],['127.0.0.1:5003', '127.0.0.1:5004', '127.0.0.1:5005']]"

To see if this replica receives broadcast message from set operation, from the set of keys which went to the partition with leader ‘127.0.0.1:5003’, re-sent one of those keys with a new value (or same value) and then we can see that the new replica started receiving the message.

For get request since it is random, the new replica might receive the get request 1 in 3 trials (with 1 leader, 2 replicas)

In order to verify what happens if we close a replica, we can kill the service ‘127.0.0.1:5004’ and then send a new set request.

The request will be blocked and will not complete unless we start the service. For get request, the response will be delayed by some unknown amount because, if the get request is forwared to the killed service, we will not get response and only after timeout of 10 seconds, we will try another replica or leader.

But there is a problem as you can see, when we create a new replica and send set requests, the new keys will be broadcasted to the new replica also, but during get request, if the request is forwarded to the new replica for an old key that existed before the replica was created, it will send KEY NOT FOUND because the old keys were not synced to the new replicas.

We will use a commit log to log all our set operations in append only fashion. It serves 2 purposes:

  1. When a new replica joins, it can query the leader’s commit log and sync all the keys seen so far in its own hashtable.
  2. When a replica gets killed and restarted, all the keys can be synced from the leader’s commit log.

CommitLog class:

The log file looks something like this: commit-log-127.0.0.0.1–5003.txt

30/10/2022 00:36:41,set b 2 1
30/10/2022 00:36:44,set d 4 3
30/10/2022 00:36:49,set h 6 5
30/10/2022 00:36:53,set jj 7 6

Each line has the timestamp and then the set command separated by comma.

set key value request_id

The method ‘write_log_from_sock’ reads bytes from the log sent by the leader into the replica’s own commit log file.

The method ‘send_log_to_sock’ sends bytes from the leader’s commit log to the replica’s commit log.

The updated hashtable service code:

In the set operation, we also need to append the set request into the commit log. We only insert it if the set request do not conflict with existing key i.e. the request is the latest for this key.

The method join replica is updated to following:

  1. Create a dedicated socket connection to own partition’s leader and ask for the commit log file.
  2. The leader transfers its own commit log with buffer size of 4096 bytes or 4KB at a time.
  3. Once the bytes are received, it is written to replica’s own commit log.
  4. Also after own commit log is updated, new replica also updates its own in-memory hashtable by reading from the commit log.
  5. Close the dedicated socket connections.

To test this feature, add a new replica as shown earlier:

start "hash table 5" python "ht_service.py" "127.0.0.1" "5005" "[['127.0.0.1:5001', '127.0.0.1:5002'],['127.0.0.1:5003', '127.0.0.1:5004', '127.0.0.1:5005']]"

Then send some new set requests. But for get requests send some old keys that were not re-send after new replica creation. Observe that in 1 in 3 trials the get request lands up in the new replica and the old key’s value is also correctly returned.

Another way to verify is to look into the log files. The log files for [‘127.0.0.1:5001’, ‘127.0.0.1:5002’] should be identical and the log files for [‘127.0.0.1:5003’, ‘127.0.0.1:5004’, ‘127.0.0.1:5005’] should be identical.

But what if we add a new partition dynamically. We might think that it should work the same way as addition of new replica. But actually there is another subtle difference — The assignment of keys.

Since the number of partitions is updated. The hash modulo values will return different indices. Thus if during set operation a key lands in partition 1, if a new partition is added then it might happen that the get operation for same key now lands up in partition 2. Thus we will never find out these keys.

One solution is to re-distribute all the keys after a new partition is added. But this might be inefficient if there are billions of keys.

Another solution is to use Consistent Hashing (will not cover the details of Consistent Hashing in this post).

This is a Python implementation of Consistent Hashing that uses sortedcontainers for O(logN) lookup and insertion operations.

Nodes A and B are 2 partitions, but each have multiple virtual nodes inorder for equal distribution of keys.

The final hashtable service for this post including consistent hashing would be something like:

During setup, each service creates its own copy of the consistent hashing ring with all the leaders as the nodes.

Note that we do not need the replicas to be in the consistent hashing ring because routing of operations happens between the leaders of the partitions.

If a new partition is added, the leader first sends JOIN message to all other leaders. All leaders updates its copy of the partitions array.

The new partition leader then fetches all nodes that comes next in clockwise order from the consistent hashing ring. Since we have a multiplier factor in the consistent hashing ring, thus we can have multiple leader nodes which can come next in clockwise order. The commit log of these nodes are synced to the commit log of the new partition leader.

Node C is added as a new partition. Key “abc” has now moved from Node B to Node C.

But note that not all keys that were earlier pointing to these partitions will now point to the new partition leader. Only those keys that comes before the new partition leader in the clockwise order will be assigned to the new partition leader.

This can be achieved by using a temporary log and then filter those keys and then update the main commit log.

Note that the keys that were earlier pointing to some other leader in the consistent hashing ring will now point to the new partition leader but since the commit logs are append only, we need to send delete message to all those partition leaders with the moved keys.

Care must be taken as to not forward the delete message intended for a partition leader because then the delete message will circle back to the new partition leader as the key now points to the new partition leader.

To start a new partition, we can run another shell script but with different ips and ports.

#!/bin/bashstart "hash table 6" python "ht_service.py" "127.0.0.1" "5006" "[['127.0.0.1:5001', '127.0.0.1:5002'],['127.0.0.1:5003', '127.0.0.1:5004', '127.0.0.1:5005'], ['127.0.0.1:5006', '127.0.0.1:5007']]"start "hash table 7" python "ht_service.py" "127.0.0.1" "5007" "[['127.0.0.1:5001', '127.0.0.1:5002'],['127.0.0.1:5003', '127.0.0.1:5004', '127.0.0.1:5005'], ['127.0.0.1:5006', '127.0.0.1:5007']]"

After the partition starts, observe the commit log files !!!

** One important point to mention here is that when we are adding the new replicas or new partitions dynamically, we are adding new entries by maintaining the original partitions array structure. If someone does not follow these protocol here, the partitions array will be messed up.

Another feature that we missed to add during replication, is that during broadcast of message from leader to replicas for replication, it might happen that some replicas crashed or is not responding.

In this scenario, we will have the updated data in some replicas and some replicas has not been updated. Thus 2 concurrent read requests, one going to the replica successfully updated will see the latest data but the one going to the replica not updated will see stale data.

Thus we will have inconsistent view.

One can somehow fix the problem using a modified 2-phase commit protocol.

  1. When leader broadcasts set/delete requests to replicas, replicas will add the key in a temporary in-memory map and will send “ok” or “ko” response if this operation succeeds or fails.
  2. Once leader receives ok from all replicas, it sends commit message to the replicas, this time, the replicas will update their hashtable copies and remove the key from the temporary map.
  3. If a request arrives at a replica which has already commited the transaction with the same key, we will see the latest value.
  4. But if a request arrives at a replica which has not yet commited the key will wait till the transaction is commited. But it is guaranteed that the request will return the latest value.

Here is a short architecture of the flow that is happening.

Architecture Flow Diagram

I would not confidently say that the above code is error proof because this has not been production tested as of yet and it is just for demonstration purposes. Few things that I can recommend:

  1. Completely understand the dependency of the requests. For e.g. there should not be circular dependency like leader A is waiting for a response from leader B and leader B waiting for leader A before both of them starts to accept requests. The cycle would never terminate.
  2. Understand which resources are common resources for multithreading and guard them using locks.
  3. Catch exceptions but also print them.
  4. Services can have variable start times and the application should work seamlessly i.e. we should wait for a service to start before sending requests.
  5. … and so on.

But there are still some issues with the above implementation:

  • When a replica crashes, the request waits indefinitely for the replica to restart. This will cause performance issues for high availability and low latency systems.
  • Using locks on sockets degrades performance as too many concurrent requests to the leader will be replicated serially to each replica.
  • When the leader of a partition crashes, all requests sent to the leader will be lost till the time the leader is again up and running.
  • Normally we will have millions of clients connected to the application. Instead of request_id for each set and get request, we will have client timestamp, but client clocks are not synchronized, thus we cannot use client timestamp to decide ordering. In such situations it is not important to find which request is the latest one but it is more important for the leader and the replicas to reach a consensus regarding which request to consider.
  • Serialization of the messages — Right now we are assuming a very simple messaging format between the instances. For more complicated situations we need to structure the messages and also serialize/deserialize them for sending/receiving respectively.

We will tackle these issues in the next post where we will use Raft Consensus algorithm for semi-synchronous replication and also leader election.

Full code is available on my Github repository:

funktor/distributed-hash-table: Distributed Hash Table implementation using Sockets (github.com)

Resources:

  1. The Ultimate Guide to Consistent Hashing | Toptal
  2. Leader election algorithms — Distributed Systems (distributedsystemsblog.com)
  3. Fundamentals of Distributed Systems | Baeldung on Computer Science

--

--

No responses yet