Parallel & Concurrent Programming (with Java)

In these series of posts we will be looking at how to implement parallel and concurrent algorithms using different programming languages such as Java, Python, Go and Haskell. There’s no particular reason why I chose these 4 languages, well maybe except for Haskell (I love Haskell).

In this post we explore concurrency with Java.

We will look into 2 different problems:

  1. A simple Wikipedia crawler — Crawls wiki pages up to a depth of 3.
  2. Multithreaded Merge Sort.

Before diving into the approaches, let’s make it clear the difference between parallel and concurrent programming.

In concurrent programming, multiple tasks are executed in an interleaving fashion. For e.g. with a single CPU core we can run only a single process at a time but we can run one or more threads. But having more than one thread does not necessarily mean that the tasks are executed in parallel.

At any given time only one thread is active.

Multithreading (on a single CPU core) is Concurrent.

In parallel programming, tasks leverage multiple CPU cores to run multiple tasks in parallel. If there are N cores, then we can create N separate processes and then each process can execute one task. Thus at any given time N tasks are being executed in parallel.

Multiprocessing is Parallel.

In Java, although it runs on only a single JVM process, but it can create multiple threads across multiple CPU cores (from Java 7 onwards).

In Java, multiple threads can be created across multiple cores but not in Python due to the Global Interpreter Lock. Thus in Java multi-threading is more useful as it executes tasks in parallel similar to multiprocessing but all threads shares the same memory address and has lower overhead of creating and destroying threads as compared to a process but in Python multiprocessing is more useful if we want to leverage parallel programming.

But if we have more threads than number of CPU cores, JVM will run some of them concurrently instead of in parallel.

The 2 important differences b/w Thread and a Process are:

  1. Threads share the same address space in memory whereas Processes have individual address spaces. Thus memory consumption is more in multiprocessing as each process duplicates the objects in its own address space.
  2. Multithreading is good for I/O bound tasks such as reading files from disk, downloading from internet etc. whereas multiprocessing is good for CPU bound tasks such as sorting, searching etc.

While saying point 2 above I do not mean that multiprocessing cannot be used to read files or multithreading cannot be used to sort numbers.

In reading files from disk, the CPU will be sitting idle during the time when a file is being read, with multiple threads, another thread can start reading another file while one is still reading. The same can be achieved with multiprocessing but instead we will have separate processes reading different files. Since most of the time goes into waiting for file read to complete thus using threads serves the same purpose as processes.

For sorting numbers, a single CPU is mostly busy and thus having multiple threads do not makes sense as most likely the same thread will be active all the time and no other thread will have the opportunity to run till the time one thread is running.

In such cases running multiple processes on multiple CPU cores can improve speed as processes are running in parallel instead of concurrently.

Java Web Crawler (Non Threaded):

  1. Start with a wikipedia page (here it is the Travelling Salesman Problem)
  2. Download the page, parse the HTML content, extract all the URLs referring to only other wikipedia pages.
  3. Insert the URLs extracted into a double ended queue such that the URL has not been visited earlier. Essentially we are doing Breadth First Search.
  4. For each page, we extract at-most 10 URLs to avoid exploding the number of URLs and internet bandwidth usage.
  5. Extract the front of the double ended queue and repeat step 1 till we have reached a depth of 3.

How to make the implementation multithreaded ?

  1. Create a shared double ended queue among all the threads.
  2. Each thread will poll the head of the queue. If there is an URL, it will remove it, follow steps 2–4 above and insert the new URLs back into the tail of the queue.
  3. To avoid threads exiting when there the queue is empty i.e. some threads are still working to parse the new URLs from the HTML but have not yet added back to the queue, we will use BlockingQueue implementation.
  4. BlockingQueue waits indefinitely till a new URL is available for a thread to poll. We are using LinkedBlockingQueue as the size of the queue is dynamic.
  5. Another thing is that the HashMap implementation for the visited URLs needs to be thread safe since multiple threads are simultaneously reading and writing from it. To avoid deadlocks and race conditions we will be using ConcurrentHashMap instead of HashMap.
  6. To share the queue and the HashMap across all threads we will be defining a custom Runnable class that will have class variables referencing the shared queue and HashMap.

We can implement multithreading in multiple different ways in Java. We will look at 3 ways to do that here:

  1. Using Threads class.
  2. Using Executors and ExecutorService.
  3. Using CompletableFuture.

First we define our custom Runnable class. Basically each thread created from our driver program will execute the code inside the run() method of this class.

A Runnable instance is like a task for a Thread.

Thus our run() method should resemble the non-threaded crawler implementation except that the data structures used for the queue and the HashMap needs to be thread safe.

Next we define our driver code. The first one uses Threads class.

We create a single runnable instance that is shared across all threads. Each thread starts executing the same runnable and thus the queue and the HashMap is also shared among the threads. Hence they need to be thread safe.

Each thread updates the queue and HashMap independently of other threads.

Since the number of threads is 50 but my machine has 4 cores, thus each core would have approximately 12 concurrent threads and at any time 4 threads are working in parallel.

The next implementation uses Executors and ExecutorService.

ExecutorService maintains an internal queue of tasks to be executed. In this case we have the same task i.e. the runnable. Also the service maintains a pool of threads (here it is 5) known as Executors.

When threads are free they poll tasks from the internal queue (not to be confused with our URLs queue). This is different than what the previous implementation with Threads class is doing.

In the previous implementation, Threads were assigned tasks upfront. It assumes that all tasks are equivalent but it might happen that some URLs are downloaded faster than the others and as a result these threads would be sitting idle and waiting for the other threads to complete.

It does not decouple tasks from threads. If we want to run N tasks then we have to create N threads. With ExecutorService, the number of threads can be much lower than the number of tasks. If we want to run say 50 tasks we do not need 50 threads. In the above we created 5 threads for 50 tasks.

Thus if a thread finished downloading an URL, it can pickup the next URL from the task queue instead of sitting idle.

The final approach uses CompletableFuture.

CompletableFuture is useful when we want to have callbacks and also compose or chain multiple steps in an asynchronous manner.

For e.g. after an URL is downloaded it is written to a database table. With Threads class or ExecutorService, we need to explicitly write the logic for writing to database in the same Runnable method.

What if the Runnable method is part of some external package which we are not supposed to edit ?

In that case, we have to first “join” all threads, then extract all the URLs downloaded and then write the logic to insert them into DB table either in a single thread or again create another ExecutorService to insert.

This is very inefficient.

With CompletableFuture, we can define our own methods (or callbacks) to run after an URL is downloaded by a thread without modifying the original Runnable class or joining all the threads.

CompletableFuture.supplyAsync(()->Callable).thenAccept(url->insertIntoDB(url));

This will create a thread to execute the Callable (which is a Runnable that returns a value from its run() method). Once the callable is executed, the output is the downloaded URL which is then used to insertIntoDB() in the thenAccept() method.

When we don’t have to return anything, then we can use Runnable instead of Callable.

CompletableFuture.runAsync(Runnable);

Note that the above call returns immediately as the task (Runnable) has been assigned to a thread asynchronously. To get the output of the above or wait for the task to finish, we have to call get() or join() on above.

We can also use ExecutorService instead of threads with CompletableFuture.

CompletableFuture.runAsync(Runnable, executorService);

The last line in the code says that once all the tasks are completed, if there is an error, return null else join the threads and end. This is a common syntax when we have multiple CompletableFutures running concurrently.

With the non-threaded implementation, downloading 100 URLs took around 24 seconds, whereas with CompletableFuture it took 6 seconds.

For the next problem, we choose Merge Sort.

There are 2 possible ways I could come up that can use multithreading with merge sort.

  1. Divide the array into M equal parts and sort the M parts parallely/concurrently, then merge the M sorted arrays in the main thread (using a priority queue).
  2. Merge the arrays A[0] and A[1], A[2] and A[3] and so on with N/2 threads. Next merge A[0..1] and A[2..3] to get sorted array A[0..3] , A[4..5] and A[6..7] to get sorted array A[4..7] and so on with N/4 threads.
    For the k-th step, we sort the subarray of size 2^k, by merging 2 consecutive sorted arrays of size 2^(k-1).

Using 1st approach:

CustomInt is a just a custom Tuple class.

CustomComparator() is a custom comparator for CustomInt class. The implementation is shown below.

Generate some 100 million random integers between 0 and 1000 and add them to a list.

  1. Divide the 100 million integers into 1000 equal parts. For each sub-list, sort it using CompletableFuture. Each sub-list corresponds to one task.
  2. For the threads, we are using an ExecutorService pool of 10 threads.
  3. Once all the sub-lists are sorted, merge them in the main thread using a Priority Queue.

Time taken to sort 100 million integers with 20 threads is around 27 seconds. This is very similar to sorting the integers without multithreading i.e. just using Collections.sort() method on the entire list.

Surprisingly with lesser number of integers, the time taken with multithreading is higher than without multithreading. For 10 million, it is 1.8 seconds vs. 1.5 seconds without multithreading.

This can be attributed to the complexities in creating threads, assigning tasks to threads and joining the threads at the end. As verified, the step of merging the sorted arrays with a queue is not the culprit.

We can verify that creating and joining threads can be time consuming by implementing merge sort using the 2nd approach.

For a window size of 2, take the first 2 subarrays A[0..1] and A[2..3], merge them in one thread and we would have a sorted array A[0..3]. Similarly, for the subarray A[4..7] in another thread and so on.

Complete this step by joining all the threads before moving onto the next step.

Update the window size by doubling it and repeat the above steps.

We need to complete and join the threads for a single window size before doubling the window size, because if A[0..3] is taken up before A[0..1] then we would get incorrect results because to sort A[0..3] we need to sort A[0..1] first.

The custom Runnable class “MergeSortedArrays” is as defined below:

Sorting 10 million integers with this approach takes around 5 seconds as compared to 1.8 seconds with the previous method.

As you can see that we are creating and joining threads for each window size. There are approximately O(logN) different window sizes to iterate over.

Thus multithreading in JAVA should be used strategically. For CPU bound tasks, unless size of data is sufficiently large, we should avoid using multithreading as the cost of creating, joining and destroying threads could be significantly higher than the cost of running the algorithm.

References

  1. Java Concurrency and Multithreading Tutorial (jenkov.com)
  2. https://www.baeldung.com/java-asynchronous-programming
  3. https://www.baeldung.com/java-fork-join
  4. https://blog.devgenius.io/details-implementation-of-java-asynchronous-programming-using-completable-future-949826bac6f3
  5. https://rjlfinn.medium.com/asynchronous-programming-in-java-d6410d53df4d
  6. https://liakh-aliaksandr.medium.com/asynchronous-programming-in-java-with-completablefuture-47ab86458aab
  7. https://dzone.com/articles/parallel-and-asynchronous-programming-in-java-8
  8. https://levelup.gitconnected.com/completablefuture-a-new-era-of-asynchronous-programming-86c2fe23e246
  9. https://www.baeldung.com/java-local-variables-thread-safe
  10. Do You Know Why Local Variables Are Thread-safe in Java? | by Vikram Gupta | Level Up Coding (gitconnected.com)
  11. https://dtuto.com/questions/1946/completablefuture-in-loop-how-to-collect-all-responses-and-handle-errors
  12. https://blog.krecan.net/2013/12/25/completablefutures-why-to-use-async-methods/
  13. http://codeflex.co/java-multithreading-completablefuture-explained/
  14. https://www.callicoder.com/java-8-completablefuture-tutorial/
  15. https://www.baeldung.com/java-completablefuture

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store