source: quanta magazine

Non-blocking image downloader using sockets in Python

Abhijit Mondal

--

As a ML and a data engineer, I am quite often required to download lots of images (for Computer Vision projects) from different datasets available over the internet. Using Python requests.get() to download thousands of images each on an average 50KB in size can be very time consuming.

For a benchmark, right now it takes around 67 seconds to download 100 images on my laptop using requests.get(url) in Python. For 10000 images, multiple this by 100 i.e. 6700 seconds or around 2 hours.

Obviously this is not the only solution out there:

def download(urls):
for url in urls:
r = requests.get(url)
process(r)

Show me the alternatives

Few well known approaches that can speed up your image downloads:

Multithreading using Threads:

# Using separate thread for each URL
# For small number of urls this makes sense, but for 10000 URLs, this will
# create 10K threads and destroy 10K threads.
def download(urls):
mythreads = []

for url in urls:
mythread = Thread(fn=requests.get, args=(url,))
mythread.daemon = True
mythread.start()

mythreads += [mythread]

for mythread in mythreads:
mythread.join()
mythread.close()

Multithreading using ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

# Using ThreadPoolExecutor with 100 threads.
def download(urls):
with ThreadPoolExecutor(100) as executor:
executor.map(requests.get, urls)

Multiprocessing using ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor

# Using ProcessPoolExecutor with 10 processes.
def download(urls):
with ProcessPoolExecutor(10) as executor:
executor.map(requests.get, urls)

Apart from the reason that there is a performance overhead is creating and destroying threads and processes in any OS, we might face other challenges as well when working on shared environments, cloud VMs etc.

Most often the VM where you are running your download program, it is not the only program that is running.

  • The maximum number of threads is often set by the system admin. In Linux, we can see the maximum number of threads by running the following command:
    cat /proc/sys/kernel/threads-max
  • Although we can increase the maximum number of threads per process, but with large number of threads (in 1000's), there is an overhead of context switching.
    If your VM has 4 CPU cores, then atmost 4 processes or threads can run in truly parallel manner. If we have more threads than number of CPU cores, then only 4 will be working at a time. When thread A exits or is sitting idle (waiting for I/O), another thread B can jump in. Thus the OS needs to change its context from A to B. With 10K threads this overhead can be significant sometimes.
  • In multiprocessing, each process copies variables in the memory, thus for an array of size 100MB, if we are creating 4 processes, the array would take up 400MB in total for all 4 copies.

In comes asyncio

Asyncio removes the dependency on threads and uses only the main thread to download multiple images concurrently.

It takes advantage of non-blocking I/O by using something known as “coroutines” with “event-loops”

import asyncio
from aiohttp import ClientSession, ClientResponseError

# An image/url downloader using async
async def fetch_url_data_async(session:ClientSession, url:str)->None:
try:
async with session.get(url, timeout=60) as response:
await response.read()
except Exception as e:
print(e)

async def fetch_async(urls:list[str]):
tasks = []
async with ClientSession() as session:
for url in urls:
# Tasks are added to the event loop
tasks.append(fetch_url_data_async(session, url))

# Unroll the event loop here and execute the tasks
await asyncio.gather(*tasks)

Read my post on asyncio here : Think twice before using asyncio in Python | by Abhijit Mondal | Jan, 2023 | Medium

and the coroutine functionality that enables writing asynchronous or non blocking codes in Python : Understanding why you won’t need Python Coroutines 99.9% of the time | by Abhijit Mondal | Feb, 2023 | Medium

In this post I am going to show how you can write your own non-blocking image downloader without using asyncio.

Going back to good old sockets

In order to leverage non-blocking I/O for HTTP get requests, we cannot use requests.get() method because they are blocking. We have to use sockets to implement our own custom downloader.

Here is our method to create a client socket from the given URL :

def create_socket(url:str, ssl_context:ssl.SSLContext):
# Parse the input url to get the hostname and path
parse:ParseResult = urlparse(url)

# Using HTTP 1.0, we can also use HTTP 1.1 but we need to add
# Connection: close
get_request:str = f"GET {parse.path} HTTP/1.0\r\n{parse.netloc}\r\n\r\n"

# Socket needs bytes as input and not string
get_request:bytes = get_request.encode()

# TCP/IP socket IPv4 protocol
sock:socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Need to set the socket to non blocking mode
sock.setblocking(0)

# Use SSL for HTTPS (port number 443)
sock:SSLSocket = ssl_context.wrap_socket(sock, server_hostname=parse.netloc)

# connect will throw error with non-blocking socket
sock.connect_ex((parse.netloc, 443))

return sock, get_request

Few things to note here:

  • We are creating one socket for each URL we want to download. Since there are limits on the number of active socket connections we have be careful when we are downloading say a million images.
  • We are connecting using SSL because the URLs to access uses HTTPS protocol.
  • When we set the socket to be non blocking, issuing socket.connect will throw BlockingIOError because it returns without actually confirming the connection is established or not. Use socket.connect_ex instead.
  • Since we are issuing GET request over HTTPS, we need to connect to the image server on port 443.

The code where the actual magic works is given below:

if __name__ == '__main__':
start = time.perf_counter()

# SSL Context for socket
context:ssl.SSLContext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(certifi.where())

sockets, request, response = [], {}, {}

# Initialize the non blocking sockets
for url in urls:
sock, get_req = create_socket(url, context)

# Add to list of sockets
sockets += [sock]

# Map needed to track the responses from each socket
request[sock] = get_req
response[sock] = b''

# To track which sockets have been completely read off
r_completed = set()
w_completed = set()

while True:
# read_socks returns only those sockets which has input data ready.
# write_socks returns only those sockets which is ready to send data.
# If a socket is present in both of them, it is returned in only one
# among read_socks or write_socks.
# we keep minimal timeout since we are using this inside while True loop

read_socks, write_socks, _ = select.select(sockets, sockets, [], 0.01)

# Check the write sockets which one are ready to send the GET request
# Note that before we can receive the image bytes, we need to
# send GET request to the image server.

for r in write_socks:
if r in w_completed:
try:
r.send(request[r]) # This is asynchronous
w_completed.add(r)
except:
# Might happen that connect has not completed yet
pass

for r in read_socks:
# Only for sockets not completely exhausted
if r in r_completed[r]:
try:
# Receive 4096 bytes in buffer
data = r.recv(4096)

if not data:
# socket has been exhausted, mark it as completed
r_completed.add(r)
else:
# not exhausted, add to existing data for this socket
response[r] += data

except socket.error:
# sometimes this will throw exception if socket is
# not ready for recv since socket is non-blocking.
pass

# check if all sockets has been exhausted then break out
if len(r_completed) == len(sockets):
break

# Close all sockets
for sock in r_completed:
sock.close()

print(time.perf_counter()-start)

Let’s look at these block by block:

Step 1: Initialize our SSL Context for connecting sockets to HTTPS enpoints.

Step 2: For each URL, initalize the non-blocking socket.

Note that here we do not making blocking connect to the image server because the cost of blocking connect is high and we can leverage select.select for non-blocking connects.

Step 3: The crux of non blocking I/O is the select.select method.

It accepts 3 lists of sockets or file descriptors (in Linux), first list is the one which from which we will do ‘recv’ also known as the read sockets, 2nd list is the one which will be used for ‘send’ also known as the write sockets and 3rd list which caused errors.

Last parameter is the timeout. The select call will return after timeout seconds. Thus the select will not block indefinitely.

It returns 3 lists, the first list is the subset from the read sockets which are ready i.e. incoming data is already present in these sockets for reading them, the 2nd output list is the subset from the write sockets ready to be sent data to them.

Thus we can monitor which sockets are ready to be send and receive data.

We also maintain 2 separate sets, completed read sockets and completed write sockets. Whenever we are reading from a socket if all the data has been read, then we do not need to process this socket anymore and can mark it as completed. Similarly for write sockets.

We exit once all read sockets has been marked i.e. all the images has been downloaded.

For each iteration of the while loop the following things can happen:

  • Some sockets are still not connected.
  • Some sockets are connected and ready to be sent the GET request.
  • Some sockets which already received the GET request is ready to send the image bytes to the client. Read 4096 bytes off them.
  • Some sockets which already received the GET request, are still not ready to send image bytes.
  • Some sockets have already read all the image bytes and have marked the socket as completed.

There is another tradeoff we have not discussed.

When the number of sockets in use are high, looping over all of them frequently (low select timeout) inside the while True loop is inefficient. Thus we can increase the select timeout in such a case.

For 100 images, the above code takes only 4.3 seconds as compared to 67 seconds with requests.get() method.

select.select is not the only non-blocking solution available for sockets, another approach is to use epoll (only for Linux).

epoll is like an internal queue for sockets. Once we register a socket with epoll, it is added to the queue. Using epoll.poll() we can pull the head of the queue and ask it if any events are present in the socket?

Here is an epoll based implementation for a non blocking image downloader:

if __name__ == '__main__': 
start = time.perf_counter()

# SSL Context for socket
context:ssl.SSLContext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(certifi.where())

# Create epoll queue object
epoll = select.epoll()

request, response, sent, conns = [], {}, {}, {}, {}

for url in urls:
sock, get_req = create_socket(url, context)

# Map needed to track the requests for each socket
request[sock] = get_req

# Track response from each socket
response[sock] = b''

# Track number of bytes sent for GET request
sent[sock] = 0

# Track file descriptor number to socket object
conns[sock.fileno()] = sock

# Register current socket with epoll in output mode i.e. monitor
# sockets ready to be sent GET requests
epoll.register(sock.fileno(), select.EPOLLOUT)

# Track all completed downloads
completed = set()

while True:
# Poll all events available in queue with timeout of 0.001 s
events = epoll.poll(0.001)

# sockets and events which are ready, similar to select.select
for fileno, event in events:
if fileno not in completed:
# Get socket from file number
sock = conns[fileno]

# If event is read event i.e. ready for recv
if event & select.EPOLLIN:
while True:
try:
# Read 50KB at a time
data:bytes = sock.recv(1024*50)

if not data:
# Read socket is exhausted so break out
completed.add(fileno)
break
else:
# Append incoming data to response
response[sock] += data

except socket.error as e:
# sometimes this will throw exception if socket is
# not ready for recv since socket is non-blocking.
pass

# If event is write event i.e. ready for send
elif event and select.EPOLLOUT:
try:
sent[sock] += sock.send(request[sock][sent[sock]:])

# If all data fro GET request has been sent, then epoll
# to monitor read events from the same socket.
if sent[sock] == len(request[sock]):
epoll.modify(sock.fileno(), select.EPOLLIN | select.EPOLLET)
except:
pass

# All images read
if len(completed) == len(conns):
break

epoll.close()
print(time.perf_counter()-start)

epoll has 2 modes for monitoring read and write events: Edge Triggered and Level Triggered modes.

In edge triggered mode, epoll.poll() will return an event on a socket only once after a read or write event occurred on that socket. For reading, the data needs to be read as soon as possible from the socket because in the next poll(), if no new event is present in this socket, this will not be returned by epoll.poll()

To use Edge triggered mode with read monitors, we use:

select.EPOLLIN | select.EPOLLET

In level triggered mode, if an event occurred at a socket, then till all the data has been read from the socket, poll() will return the socket and the event. Thus we can read the data with multiple polls.

Level triggered is the default mode.

For 100 images, the epoll code takes only 1.4 seconds as compared to 67 seconds with requests.get() method. Clearly we have a winner here !!!

One can optimize the performance by playing around with the timeout, number of bytes to read into the buffer and so on.

--

--