source: quanta magazine

Understanding why you won’t need Python Coroutines 99.9% of the time

Abhijit Mondal

--

Having worked extensively with Python across multiple projects in my career, I have used multithreading and multiprocessing in many places wherever I felt that there is a need for one.

One of the biggest advantage of multithreading and multiprocessing is that it is quite straightforward to understand where you will need them and how you will modify your existing sequential codes to make them concurrent with multithreading and multiprocessing.

Starting with the Why ? Motivation for going beyond multithreading

One of the major drawbacks of multithreading is to create and destroy threads.

Take for example the below python code to download 5000 image URLs:

start1 = time.perf_counter()

# start thread pool with 5000 threads
executor = ThreadPoolExecutor(5000)

start2 = time.perf_counter()

# add the thread tasks as futures
futures = []
for url in urls:
futures += [executor.submit(requests.get, url)]

# wait for futures to be completed
wait(futures)

# Time taken without creation and destruction of threads
print(time.perf_counter()-start2)

# destroy threads
executor.shutdown()

# Time taken including creation and destruction of threads
print(time.perf_counter()-start1)

With more number of threads, the time to download the images gets reduced but the time taken to create and destroy threads increases.

In the above code, using 5000 threads to download 5000 images, the time difference with creation and destruction of threads is almost 3.5 seconds.

Another interesting approach to concurrent programming is Event Driven programming (or asynchronous programming) i.e. take an action if an event occurs else do something else.

For e.g. if I have to download 100 images, then once I sent a request for an image URL, then it is upto the OS to complete the transfer of the bytes over TCP/IP. The period over which the transfer of bytes is in progress, the main thread of the program is sitting idle.

During that time, the main thread can initiate request for another image URL.

Once an image is completely downloaded, resume any code after that.

I am emphasizing its the main thread because we will be using only the main thread to achieve concurreny and no additional threads or processes.

High level idea of single thread based concurrency

For e.g. if I have an image downloader method, that takes an URL as input and sends get request, then if we are using only main thread, each URL has to be downloaded sequentially because once the method is called, it will not stop unless killed or image is completely downloaded.

def download_url(url:str):
r = requests.get(url) # this is blocking I/O

# do some processing with the downloaded data
process(r)

What’s an ideal situation looks like?

After a request is initiated by the main thread and control passes over to OS, we can call the same method with another URL instead of waiting for the image to be downloaded completely.

For this to happen, there are 2 major prerequisites:

  1. We must be able to pause the method at the point where control is transferred to the OS. And after image is completely downloaded, it must be possible to resume the function.
  2. The part where main thread has given up control to OS and image is being downloaded byte by byte, should be non blocking i.e. main thread should not wait for the image to be downloaded.
def download_url(url:str):
future = non_blocking_request_method(url)
# This returns immediately after initiating request

result = wait_for_main_thread_to_come_back(future)
# main thread pauses at this line, remembers the state and comes back
# when the future object is done.

# do some processing with the downloaded data after main thread returns
process(result)

Once we have these 2 conditions satisfied, we can then use a queue, to track the function calls.

  • Get the 1st URL and call the download function. The function will initiate the download request (at ‘non_blocking_request_method’) and will pause (at ‘wait_for_main_thread_to_come_back’).
  • Insert the function ‘instance’ into the back of the queue.
  • If there are pending URLs, then pickup the next URL and repeat.
  • If there are no pending URLs, then remove a function instance from the front of the queue.
  • Check the state of the download, if it is downloaded then resume the function instance (at process(result)) else re-insert it at the back of the queue.

Such type of functions where we can pause in-between execution waiting for an event and resume after the event is available are known as Coroutines.

Basic idea about coroutines

Coroutines are functions which can pause for an input during its execution.

They are similar to python generators but unlike generators which pauses to send result back to caller, coroutines pauses to accept inputs from caller.

Here is an example of a generator that generates line by line from a text file:

def read_file_generator(file_name):
with open(file_name, "r") as f:
# returns one line at a time instead of reading entire file
# in memory
while True:
# need to put this inside while True loop because
# we will not know how many lines are there without reading the
# entire content.
yield f.readline()

if __name__ == '__main__':
gen = read_file_generator("urls.txt")
while True:
try:
line = next(gen)
# do some processing with line
except:
break

On the other hand let’s say if we want to download the read URLs from the file in the above code, a coroutine will come in handy.

import requests

def download_url(fp):
while True:
# Pause here for url to be available
url = (yield)
r = requests.get(url)
fp.write(r.content)

if __name__ == '__main__':
gen = read_file_generator("urls.txt")

# file to store downloaded contents
fp = open("url_contents.txt", "a+")

# This does not return immediately and will pause
coro = download_url(fp)
next(coro)

while True:
try:
url= next(gen)
# do some processing with line

# send the url to the coroutine.
# after this step, the coroutine resumes again and writes
# the content to a file
# also it automatically calls next() and wait for next input.
coro.send(url)
except:
break

fp.close()

You might ask why do we need coroutine for this purpose shown above, we can just call requests.get() after fetching the URL from the generator like here:

if __name__ == '__main__':
gen = read_file_generator("urls.txt")

fp = open("url_contents.txt", "a+")

while True:
try:
url= next(gen)
r = requests.get(url)
fp.write(r.content)
except:
break

fp.close()

The thing is that a single coroutine instance is meaningless as the above use case. The basic idea is to achieve concurrency by using multiple coroutines.

But recall that we cannot call threads to run multiple coroutines in parallel because that would defeat its purpose. We can call threads on the requests without the use of coroutines.

As we saw earlier, we can use queue or linked list like data structure to pick up new or pending tasks from the head of the queue and update the queue with in-progress tasks at the tail.

Implementing a simple “non-relevant” application with coroutines

Let’s look at the following problem:

Given a list of strings, find the edit distance between each pair and then filter all pairs which have an edit distance less than equal to 1.

Here is a Python implementation of the levenshtein distance algorithm:

# Compute levenshtein distance between str1 and str2
def edit_distance(str1:str, str2:str)->int:
d:list = [[float("Inf")]*len(str2) for j in range(len(str1))]

for i in range(len(str1)):
for j in range(len(str2)):
if i == 0 and j == 0:
d[i][j] = 1 if str1[i] != str2[j] else 0
elif i == 0:
d[i][j] = d[i][j-1] + 1 if str1[i] != str2[j] else d[i][j-1]
elif j == 0:
d[i][j] = d[i-1][j] + 1 if str1[i] != str2[j] else d[i-1][j]
else:
if str1[i] == str2[j]:
d[i][j] = d[i-1][j-1]
else:
d[i][j] = 1 + min(d[i-1][j-1], d[i-1][j], d[i][j-1])

return d[-1][-1]

For the queue implementation, we chose to use a Priority Queue or Min-Heap instead of a double ended queue.

Using priority queue, we can handle following situations better:

  • Different tasks may have different priorities to be completed.
  • Different tasks might have different estimated time for completions. For e.g. if task A normally requires 10ms and task B requires 15ms, then even if task B was started before task A, we will check the update on A before B if we use a priority queue instead of a standard queue.

Also we use an additional key-value data structure “event_map” that is used to store the coroutine corresponding to an id, that we generate everytime we register a coroutine in our “event loop”.

Below is an implementation of how we can register an event in our own “event loop”.

# Register coroutine with data in heap and event_map
def register_coro_event_loop(heap:heapq, event_map:dict, coro, data,
estimated_completion_duration:int=1):
id:str = str(uuid.uuid4())

# event_map is used to map the generated id with the coroutine because
# we need to be able to call send on the coroutine once we extract
# the root node id of the heap
event_map[id] = (coro, data)

# Remove the node after "estimated_completion_duration" milliseconds
heapq.heappush(heap, (\
time.time()*1000 + estimated_completion_duration, id\
))

Since for every coroutine function, we need to call a next(coro) on the coroutine before we can send any data, we can make a python decorator to simplify our codes:

# Decorator function for coroutines
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
next(cr)
return cr
return start

You might notice that this problem is actually CPU intensive and no way we can achieve concurrency with coroutines. That’s why I said a “non-relevant” problem at the beginning.

To give it a new direction, we will also add a sleep of 10 milliseconds after the edit distance is computed for a given pair of strings.

@coroutine          
def calculate_edit_distances(output:list, heap:heapq, event_map:dict):
while True:
# Wait for input data (similar to await in asyncio)
coro, data = (yield)

str1, str2 = data
d:int = edit_distance(str1, str2)

# Register custom non-blocking sleep
sleep_duration = 10 #ms

sleep_coro = \
custom_sleep(sleep_duration, coro, heap, event_map)

register_coro_event_loop\
(heap, event_map, sleep_coro, ("wake up!!!"), sleep_duration)

# Wait for sleep to be over and receive wake up signal.
# This is simulating some non-blocking I/O in real life
# such as non-blocking HTTP request or non-blocking file I/O
sleep_coro, sleep_data = (yield)

# Register filtering of similar strings coroutine
filter_coro = filter_similars(coro, heap, event_map)

register_coro_event_loop\
(heap, event_map, filter_coro, (str1, str2, d))

# Wait for filtering to be over.
filter_coro, filter_data = (yield)

# Get the filtered string pair and add to output list
str1, str2 = filter_data
output += [(str1, str2)]

Note that time.sleep() function in python is blocking i.e. the main thread will wait for the sleep to wake up. Thus we need some sort of non-blocking sleep. We can obviously use asyncio.sleep() but instead of going that route, we will implement our own simple non-blocking sleep logic.

This is very simple.

When the sleep task is created in the queue, we insert the timestamp when it is expected to wake up. Then after some duration, check if the current timestamp is greater than the wake up time for sleep, if it is, then mark this task as done and resume the coroutine after the sleep.

Else re-insert the sleep task back into the queue.

@coroutine
def custom_sleep(duration:int, next_coro, heap:heapq, event_map:dict):
# Next time when to resume execution
wake_up_time:int = time.time()*1000 + duration

while True:
coro, data = (yield)
curr_time = time.time()*1000

if curr_time < wake_up_time:
remaining = wake_up_time - curr_time

# Sleep is not completed yet, re-register sleep task
# new evaluation duration is max of remaining or 1 ms
# The max is used so that we do not evaluate it too
# frequently also.
register_coro_event_loop\
(heap, event_map, coro, data, max(remaining, 1))
else:
# Sleep is completed, send wake up signal
try:
next_coro.send((next_coro, data))
except StopIteration:
# If downstream coroutine is closed then exit current coroutine
break

After the edit distance task is completed for a pair of strings, and the non-blocking sleep got over, we send the pair of strings and their distance to another coroutine that does the filtering job:

@coroutine
def filter_similars(next_coro, heap:heapq, event_map:dict):
while True:
# Wait for input data (similar to await in asyncio)
coro, data = (yield)

# Inputs are 2 strings and their edit distance
str1, str2, d = data

# Filter strings where edit distance is less than equal to 1
if d <= 1:

# Register custom non-blocking sleep
sleep_duration = 10 #ms
sleep_coro = custom_sleep(sleep_duration, coro, heap, event_map)
register_coro_event_loop\
(heap, event_map, sleep_coro, ("wake up!!!"), sleep_duration)

# Wait for sleep to be over and receive wake up signal.
# This is simulating some non-blocking I/O in real life
# such as non-blocking HTTP request or non-blocking file I/O
sleep_coro, sleep_data = (yield)

# Send output back to edit distance coroutine
try:
next_coro.send((next_coro, (str1, str2)))
except StopIteration:
# If downstream coroutine is closed then exit current coroutine
break

Again we add an artificial non blocking sleep of 10 ms to the filtering coroutine so as to simulate non-blocking I/O operations.

Here is the main driver program to execute the above coroutines:

if __name__ == '__main__': 
# Generate random strings
s=string.ascii_lowercase
strs = [''.join(random.choices(s,k=random.randint(3, 10))) for i in range(1000)]

start = time.perf_counter()

output = []
heap, event_map = [], {}

for i in range(len(strs)):
for j in range(i+1, len(strs)):
coro = calculate_edit_distances(output, heap, event_map)
# Register coroutine
register_coro_event_loop(heap, event_map, coro, (strs[i], strs[j]))

# Unroll the event loop
while len(heap) > 0:
# Extract heap root if current time > earliest evaluation time
if time.time()*1000 >= heap[0][0]:
_, id = heapq.heappop(heap)
coro, data = event_map[id]

try:
coro.send((coro, data))
except StopIteration:
print("Coroutine closed !!!")

print(output)

print(time.perf_counter()-start)

After we register all coroutines, we start unrolling the event loop. Remove the task with the earliest evaluation time first, then send any data to the coroutine required for it to resume.

Notice the pattern of the data that we are sending through coroutine send calls.

Along with the data required by the coroutine to resume the function, we are also sending the current coroutine instance.

This is because the coroutine might want to initialize another coroutine from within itself like the edit distance coroutine or the filter coroutine initializes a custom sleep coroutine. Thus when the loop encounters the child coroutine, it can call its parent coroutine’s send method and lets the parent coroutine resume.

sleep_coro = \
custom_sleep(sleep_duration, coro, heap, event_map)

register_coro_event_loop\
(heap, event_map, sleep_coro, ("wake up!!!"), sleep_duration)

In order to compare the coroutine version with a sequential version of the same implementation, we can write the sequential version to be:

if __name__ == '__main__': 
start = time.perf_counter()
output = []

for i in range(len(strs)):
for j in range(i+1, len(strs)):
d = edit_distance(strs[i], strs[j])
# non-blocking sleep here becomes blocking sleep of 10ms
time.sleep(0.01)

if d <= 1:
# non-blocking sleep here becomes blocking sleep of 10ms
time.sleep(0.01)
output += [(strs[i], strs[j])]

print(output)

print(time.perf_counter()-start)

With 100 strings, we would have 10000 pairs and for each pair assuming that the edit distance takes 0.1ms, and sleep takes 20 ms (10+10). Thus the total time for the blocking implementation is around 200s.

Compare this with the coroutine version which takes only 0.15s.

Sad reality of working with coroutines

Reality Check 1: Concurreny with coroutines makes sense only for I/O intensive workloads (similar to multithreading in Python).

If you want to make CPU intensive workloads concurrent, then either use multiprocessing or multithreading in some other language such as Java, Go or C++.

Reality Check 2: Except for a few asyncio implementations, most existing Python libraries for I/O are blocking.

Concurrency with coroutines using blocking methods are not useful.

The bad thing about non-blocking or asynchronous methods is that any method they are calling also needs to be non-blocking. Any blocking I/O method will make the entire call stack blocking.

This is why you will notice that in asyncio, if you are defining a function to be async, you also need to declare all functions called within itself to be async too.

Thus there is no easy way to make existing I/O methods non-blocking if they are already blocking.

These include many python apis for DBs, caches etc.

Reality Check 3: Its not trivial to come up with an approach to convert a non-concurrent non-coroutine application to a concurrent coroutine application unlike converting a single threaded application to a multithreaded one.

--

--