In this post I am going to elaborate on the lessons learnt while building distributed web crawlers on the cloud (specifically AWS). In fact, I am going to demonstrate 2 different applications of the crawler:
- Crawling wikipedia — This is how standard Google crawlers work by following links from one page to another using a BFS search.
- Crawling Amazon — Using a seed set of search queries, I are going to crawl search results for product titles and then follow the product links and crawl product metadata from details pages. This is a 2 level BFS search.
For the distributed crawler I had used Python.
How to crawl web links ? What data structure and algorithms to use ?
The standard algorithm for following links from one page to another is Breadth First Search (BFS).
Pop a URL from a FIFO Queue, fetch the contents of the URL, identify all the URLs in the content and insert those URLs at the end of the FIFO Queue. Continue this process as long as there are no more URLs in the queue or we have crawled URLs upto a specified level in the BFS graph or have stopped/halted the crawling process.
How to fetch URLs and parse content ?
For this I am using the ‘requests’ library of python to fetch links and ‘BeautifulSoup’ library to parse the contents. You might also need ‘lxml’ library for BeautifulSoup to work.
How to avoid being classified as a bot or a DDOS attacker ? How to fake human browsing ?
There are lots of different strategies and viewpoints regarding this. One could use Selenium drivers to simulate human browsing. But for this crawler, I am using combination of following techniques:
User-Agents — Using proper user-agents in the header of each request. Using an user-agent implies that a request is coming from a browser and not a bot. There is a python library called ‘fake_useragent’ that will enable you to randomly generate a user agent to be used with each unique request.
An example of an user-agent:
‘Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.124 Safari/537.36’
Rotating IPs — It is not enough to fake an user-agent. If you are using same IP address then the host might block your IP. There is website called Free Proxy List which lists available proxy IPs to be used. All requests to your host is routed through the proxy IP. The list of proxy IPs are temporary and are refreshed every 10–15 minutes. Thus you cannot use the same proxy IPs everytime for your crawler.
The above function will crawl the proxy list website and will fetch the 100 active proxy IPs to be used for each request. Also the proxies listed at the beginnning of the list are most reliable as they are most recent. Thus I have written a weighted sampling function to randomly sample a proxy IP to be used with each request and proxies at the beginning are given higher weightage as compared to proxies below in the list.
Timeouts — It is not advisable to hit a particular domain continuosly. For this I am using timeout of 1.0 s between each request to a particular domain. Since my application is a multithreaded one I will explain later how I am handling timeouts with multiple threads.
The purpose of the lock is to set the last_accessed_time for each domain. Without lock one thread might read an outdated last_accessed_time while another thread is in the process of updating the last_accessed_time.
Rotating domains — For some websites such as Amazon, they have different domains such as .com, .in, .co.uk, .sg etc. My application is only dependent on the fact that the contents crawled are written in English. Thus my crawler is rotating domains in each request in order to avoid hitting any particular domain continuously. For this also I am using weighted sampling as some domains are deemed more critical for my application.
How to crawl in parallel ?
There are 2 different ways one can crawl URLs parallely. Multiprocessing and Multithreading. In python the library ‘multiprocessing’ supports both of these. So which one should I prefer ?
There are pros and cons for both of them.
In Multiprocessing each request is made in a separate process which also means that variables are copied in memory of each process. If the list of URLs or the FIFO Queue is not shared between processes then a copy of the entire list of URLs will be made in each process and thus memory requirements are much higher with Multiprocessing. Thankfully our FIFO Queue is a shared one.
Also Multiprocessing has high startup cost for each process as compared to threads. In a sense Multiprocessing is true parallel processing as each process runs independently (except for shared memories such as FIFO Queue etc.)
In Multithreading each request is made in a separate thread (within the same process) which also means that variables are NOT duplicated in memory. Threads are not true parallel processing as each thread is not run concurrently but contextual switching happens between threads.
But the deciding factor for me was that Multithreading is suitable when the bottleneck is I/O and Multiprocessing is suitable when the bottleneck is CPU run-time. What this means is that if most of the time a program takes to run goes into I/O such as reading and writing to/from network, or reading and writing to/from disk, then threads are preferred as the time between contextual switching of threads is negligible as compared to I/O times. Thus with threading we gain from reduced memory consumption.
Whereas when most of the time our program spends is in CPU bound tasks such as some in-memory computations (e.g. compute a Trie of words etc.), then Multiprocessing is preferrable.
In crawling, most of the time goes into fetching the content of an URL i.e. I/O over the network. Thus Multithreading makes more sense in this scenario.
With Multithreading there is one more important consideration — Handling thread safety.
With Multiprocessing all variables are by default read/written in its own process (separate copies) and thus do not interfere with its copy in a different process unless it is a shared variable. Whereas in Multithreading variables within the same scope are shared by all the threads.
How to implement a locking mechanism for thread safety ?
Most of the use-cases that require locking are scenarios where a transaction is in place or some people call them GET-and-SET i.e. we update the value of some variable depending on the current value of the variable (and other variables). Let’s take a simple example:
x = x+1
Assume that initially x=10. If we execute the above statement twice (sequentially) then after 2 times, x=12.
Let’s say we want to execute 2 times in parallel using 2 threads A and B. It might happen that both A and B reads x=10 and both A and B are incrementing x by 1 w.r.t. to 10. Thus after execution we will have x=11 and not 12 because the update by one of the threads A is not received by the other thread B.
In crawling following scenarios requires locking:
- Crawl a URL only if it has not been crawled before i.e. read from a set of crawled URLs and if the current URL is not in the set, then crawl the URL and add the URL to the set else do not crawl.
- Domain based timeouts — Check the last accessed time of a domain and if the last accessed time is more than the timeout interval then crawl URL from that domain and update the last accessed time to current time else do not crawl URL from the domain.
One standard technique is using Readers-Writers Lock. The idea is pretty simple. We have 2 different kinds of locks — Read Lock and Write Lock.
Whenever we are reading the current value of the variable, we put a Read Lock on it. A Read Lock is only blocked by a Write Lock i.e. if a Write Lock is present on the variable (one thread is updating value of the variable) then we cannot put a Read Lock and have to wait till the write is complete.
But if there are other Read Locks on the same variable then a Read Lock does not get blocked. Implies that multiple threads can read the value of a variable simulteneously (unless it is being updated) but only one thread can update the value of the variable.
A Write Lock will have to wait till all Read Locks has been removed i.e. no thread is reading the variable. This might lead to a case called ‘starvation’ where the Write Lock keeps on waiting indefinitely to update a variable because many threads are reading the same variable.
How to setup a distributed environment in the cloud ?
For cloud deployment I am using AWS EC2 instances. To start with I have created 2 EC2 instances with the Amazon AMI Linux image with RHEL as the OS.
The configuration is ‘T2.micro’ which comes with 1 GB of RAM and 8 GB of disk space. This is probably the cheapest option for my requirements.
Now I have 2 machines meaning that if on one machine I am able to run 100 threads, now I am able to run 200 threads. Thus I will be able to download URLs using 200 threads in parallel.
How to handle shared objects and data structures in distributed system ?
For e.g. if each machine runs its own bloom filter process then although bloom filter is shared by all threads within the same machine, it will not be in coordination with the bloom filter in the other machine.
Thus same URL will be crawled in both machines as both of them are working independently. We want some kind of coordination between the 2 machines.
For this we are going to use Redis in-memory database.
In AWS we can setup Redis cluster using ElasticCache service. I have setup a Redis cluster with 3 shards (1 master and 2 slaves) and replication factor of 3 meaning that each shard’s data is backed up in 3 nodes. I have taken T3.small instances.
We are going to use Redis (ElasticCache) for all shared objects between threads and processes.
How to efficiently lookup visited URLs ?
One simple way is to maintain a set of all visited URLs or hash of URLs. Lookup is not an issue with sets as it is O(1). Assumming 1 billion URLs to be tracked and each hash is of size 36:
sys.getsizeof(hash(URL)) = 36 bytes
Thus total size of in-memory data = 33 GB. This size is expected to increase at a rate of 1 GB per day.
Thus unless using a distributed in-memory database to track the visited URLs, we cannot scale lookup data.
Another solution is to use a Bloom Filter. Bloom Filters are probabilistic data structures meaning they are prone to errors. The errors in Bloom Filters are False Positives meaning that it can say that an URL has been visited when it has not been visited.
Due to which we might skip some URLs from being crawled. Which is ok for some applications but may not be desirable for Google or Bing.
In Bloom Filter, we define a contiguous array of memory locations of size ‘m’. The size of array is generally chosen to be at-least 10 times higher than the number of unique URLs to be tracked. Generally ‘m’ is chosen to be a large prime number. Also we define ‘k’ hash functions H_k
H_k(x) = (a_k*x + b_k) % m
where a_k and b_k are prime numbers uniquely chosen for each hash function H_k.
There is trick which allows to select only two pairs (a_1, b_1) and (a_2, b_2) all other pairs are generated from it as follows:
h_1 = (a_1*x + b_1) % m
h_2 = (a_2*x + b_2) % m
H_k(x) =(h_1 + k*h_2) % m
For a given URL, compute the x = hash(URL) and then compute H_k(x) for all k.
This will return k indices in the array, which we set to 1. Initially the array is set to all 0s.
To lookup whether an URL is present or not, compute x = hash(URL) and compute the k-indices from H_k(x).
If all the k-indices is set to 1 then we say that the URL has been crawled else we crawl the URL and add it to the bloom filter.
Note that due to some of the URLs crawled before, all the k-indices is set to 1 for the current URL. Thus although it might not have been crawled but still bloom filter says it is crawled.
Assumming that ‘m’ is approximately 10 billion and ‘k’=5 and in each cell we have either 0 or 1 which is just 1-bit. The size of the bloom filter is 10 billion bits = 1.16 GB i.e. 3.45% of the memory requirement with in-memory sets.
The false positive error with bloom filters is given as :
(1-e^(-k*n/m))^k, where n = Number of URLs
For n=1 billion and m=10 billion and k=5, we have FP error rate = 1%
How to persist crawled URLs (and metadata) in database ?
The purpose of crawling is to fetch contents from webpages and then persist those content in a database so that we can work with those contents in the future. Few applications are search engines, graph of web links, recommendation systems etc.
For our purpose we want a database that can handle large volumes of write. We do not care about read performance as most of the read heavy workload is performed offline.
For this reason we have chosen Cassandra as the database. Cassandra is suited for very high write to read ratio.
To give a brief overview of why Cassandra can handle large volumes of write.
Cassandra consists of 3 different components: Memtable, SSTable and a Commit Log.
All writes to Cassandra is initially written to Memtable (which is in-memory) as multi-level hash-maps and also written to an append only on-disk filesystem called the Commit Log. Append only on disk is very fast operation.
When the Memtable reaches a certain size (as % of total available RAM), it is flushed to SSTable and the Commit Log is also flushed. SSTable is an immutable multi-level key value store stored on disk. Any data written onto SSTable cannot be updated. SSTables from different timestamps are periodically merged (compaction).
Thus any update to a column associated with a primary key will not be overwritten in SSTable but will be duplicated with the current timestamp.
The purpose of Commit Log is to serve as a backup incase the Memtable is corrupted or deleted from memory. The Commit Log is then used to replay the series of write queries and flushed onto the SSTable.
As we see that all 3 of them are optimized for fast writes and less optimized for reads and if we are lucky, read requests will be served from the Memtable directly (which is very fast) but once the data is flushed onto SSTable, any read request not in Memtable will scan through all SSTables and multiple updates for the same column and then based on latest timestamp retrieve the result.
On AWS we can use Cassandra through Amazon Keyspaces service.
To use Cassandra, we need to create Keyspace. Keyspace is basically analogous to Database in relational DBs. Column-Family in Cassandra is analogous to Tables in relational DBs.
For e.g. I want to persist my crawling data in a column-family ‘crawler’
I am using ‘url_hash’ as the PRIMARY KEY i.e. all entries is supposed to have an unique url_hash.
For inserting data into cassandra column-family, I am using prepared statements which is kind of optimized way of writing data when we have to insert/update multiple times in a single request.
Prepared statements avoid having to parse the query everytime when we execute ‘session.execute()’ command in cassandra-python.
How to start and run you crawling scripts on AWS from your local computer ?
For remote command execution on AWS EC2 instances, I am using Fabric python library. Basically it is a wrapper over SSH commands to execute commands and scripts.
To run remote commands on EC2 using Fabric, create a file on your project directory named fabfile.py
For e.g. few functions in my fabfile.py
We define the EC2 hosts at the top of the file and also provide the locations of my EC2 .pem public key files on my local machine.
If I run the command from shell ‘fab run_df’ then this command will run in both the EC2 instances and return the result of the command ‘df -h’ from both machines.
Similarly running ‘fab git_pull’ will update the git repositories on my EC2 instances.
The ‘@parallel’ keyword ensures that the command is run in parallel on both machines.
How does Redis ElasticCache clusters work for distributed systems ?
Whenever a key is created in Redis, an integer hash value is computed as CRC16(key) and then we take modulo 16384
h = CRC16(key) % 16384
The keys are distributed uniformly across the shards. For e.g. if there are 3 shards as in my case, then for keys with values of h in the range 0 to 5461 (16384/3) are assigned to the 1st shard, keys with values of h in the range 5462 to 10922 are assigned to 2nd shard and the remaining are assigned to 3rd shard.
There is also a distributed hash table (DHT) in each shard mapping the range of key hashes to the shard IP. So whenever a key is requested it is first looked up in the DHT and then the shard is determined.
Standard algorithms such as Consistent Hashing for constructing the DHT is applicable here.
Standalone Redis machines without DHT cannot be used as a shared in-memory database because if we are on one machine and want to access a key in a different machine then there is no way for the current machine to know which shard hosts the requested key and will throw a ‘MOVED’ error.
For using Redis clusters, we need ‘redis-py-cluster’ python library.
How to create a distributed shared Queue for URLs ?
Distributed FIFO queues in Redis can be created using the RPUSH command for adding an element at the end of the queue and LPOP command for removing an element from the front of the queue.
Since we are using multiple threads to add and remove element from the FIFO queue, we need threads to be blocked until there is an URL available in the queue ready to be popped. Thus we use BLPOP instead of LPOP which is a blocking version of LPOP. I am using a timeout of 5 seconds.
Thus if any thread is waiting to remove an URL from the front of the queue it will wait for a maximum of 5 seconds before exiting. If there is no URL available within this time, we lose the thread.
How to create a distributed locking mechanism ?
Earlier we have seen how we can use Readers Writers Lock to make locking more effecient. But it suffers from 2 drawbacks:
- Write threads will starve waiting for Read threads to release lock on a resource.
- It is not distributed. i.e. lock applied on a shared resource by a thread in one machine will not be visible to another thread in a different machine.
I am using a customized version of the redlock python distributed locking algorithm using Redis as the distributed lock. The original redlock python version works only standalone Redis instances not on Clusters.
The idea is pretty simple:
- Assign a unique key name to a shared resource e.g. FIFO queue
- Assign a unique lock name (e.g. uuid.uuid1()) as the value
- Whenever a thread wants to obtain a lock, if the lock is available, it will call Redis SET command as follows.
- SET ‘resource_name’ ‘lock_name’ NX PX=1000
- This will set the key and the value with NX meaning it will only set if the key is not already set and PX=1000 meaning that the key will be valid for 1000 milliseconds or 1 seconds.
- If another thread tries to SET the ‘resource_name’ key within the PX interval, it will be blocked and cannot do so because the key is already set.
- Only if the thread tries to SET the ‘resource_name’ key after 1000 milliseconds and assuming no other thread has acquired it in the meantime, will it succeed.
- If a thread is not able to SET the key, it will wait for sometime (sleep) and then try again until it is able to acquire the lock or there is a timeout.
- To release the lock just DEL the key by the current thread holding the lock.
In redlock there is a consensus which allows to set the lock on a ‘resource_name’ key only if at-least in N/2+1 machines, the lock is successfully acquired, else it will fail. The implementation assumes that the ‘resource_name’ key is duplicated across machines.
But in the case of Cluster, there is only one ‘resource_name’ key that is visible across shards, so no consensus is required.
The following class implements a customized locking mechanism using distributed Redis clusters.
How to optimize Redis read/write performance ?
There are few techniques I had explored while optimizing Redis performance.
In most cases performance degradation happens due to multiple round trip to the Redis clusters. i.e. If I want to set 3 different keys then I would call the Redis cluster 3 times and it will incur 3 times the network round-trip-time.
In this case I use Redis pipelines to optimize read/write queries. Batch multiple read and write queries in a single network call.
Ofcourse pipelines will not work for transactional queries for e.g. SET a key if the key is not already SET. For transactional queries, we need to WATCH a key for any changes by another thread and transactional statements are made atomic with the help of MULTI and EXEC commands.
Following code sample demonstrates how redis pipeline is used to add a new URL to queue.
Since this is a transaction as we are pushing the URL to queue based on the condition that the hash of the URL has not been seen before.
We are ‘watching’ the hash of the URL meaning that if any other thread has set the same hash then the current transaction fails and the URL is not added to queue, which makes sense because some other thread has already added it.
With this kind of locking, performance is much better as we are not unnecessarily locking transactional statements.
How to maintain states of crawled and un-crawled URLs for incremental crawling ?
All uncrawled URLs are kept in-memory in the Redis cluster. Thus next time whenever I am running the crawler, it pops URLs from the shared queue and continues the process until crawling ends or we manually kill the process.
You might wonder what if the Redis crashes or the instance restarts, then all in-memory data (held in RAM) will be lost. There is provision in Redis to backup in-memory data into disk.
How to crawl pages with too many links e.g. wikipedia pages have many links ? Should we go breadth first or depth first or other better technique ?
The problem with BFS for crawling is that there could be 1000’s of URLs in each page thus I would be crawling 1000 URLs at each level of BFS. Thus with 1 million URLs I could still be only at the 2nd level. The disadvantage with this is that some wikipedia pages could be more important than others and links in those wikipedia pages should be followed before links in less or not important wikipedia pages.
BFS assumes equal importance of pages.
With Depth first search approach, we can follow a single path of links at a time upto a certain level and then backtrack. The advantage is that we are not stuck at the 1st or 2nd level but we can crawl 1 million links that comprises of URLs from upto 5–6th level.
DFS can be implemented easily using LIFO Stack instead of a FIFO Queue.
But one big disadvantage with DFS is that we still do not know which URLs/Pages are more important and how to crawl pages giving priority to important pages.
Assuming that we know some kind of score for each page and that pages with higher scores should be crawled before pages with lower scores. Then instead of BFS or DFS we use a Max Priority Queue which is nothing but a Max Heap data structure, where URLs are popped from the queue based on which of them have the highest score currently.
In python, heap implementation is by default Min-Heap, thus we need to insert the negative of url score to simulate Max-Heap.
Unlike O(1) insertion and deletion in Stack or Queue, insertion and deletion from Priority Queue is O(logN) where N is the total number of URLs currently in the queue.
To implement Priority Queue in Redis I am using Redis Sorted Sets (ZADD). Redis Sorted Sets are not heap data structures but they allow heap operations with same time complexity.
Another advantage of using priority queue for URLs is keeping the size of the queue within limits. Note that for each crawled URL we are inserting 10–100 new URLs in the queue thus very quickly the size of queue will explode if we do not truncate it somewhere.
The best way is to keep the URLs with the highest N scores and discard the remaining. I am keeping N=10000.
How to assign score to individual pages ?
Standard algorithm for scoring pages is the PageRank.
PageRank basically assigns a score to each page which is the probability that an user browsing pages randomly will follow links from a random web page and will eventually end up at the current page (maybe at infinity time !!!)
But the problem with PageRank computation is that we need to crawl web pages before computing the PR score.
It is more suited for ranking webpages or for re-crawling.
As a proxy for PageRank I am using PageViews data for wikipedia pages. The API for pageviews can be obtained from wikimedia.
We can also use other proxy metrics such last edited timestamp. This will basically serve the queue as a LRU cache.
How to handle re-crawling ?
For some applications such as search engines, they need to periodically crawl websites for updating search index because websites change over time. Some websites change very frequently whereas some never change.
Pages which are ‘important’ and change frequently should be crawled daily, followed by pages which are less ‘important’ and followed by pages which is updated less frequently.
This can be achieved by invalidating the set bits in the bloom filter corresponding to an URL.
But the problem is that we cannot unset bits in the Bloom Filter because we do not know which URL or set of URLs have set the bit.
Alternatively we have to use a Redis Set with an expiry time (as seen above) inversely proportional to the importance of the page.
Define a fixed interval for re-crawling e.g. 30 days or 2592000 seconds. Then multiply this by exp(-k*score) where k is a constant e.g. k=2 and score is the score (importance) of the page.
Expiration timeout = 2592000*exp(-k*score)
For k=2 and score=0.9, expiration time is set to 5 days which is 6 times more frequent than a page with score almost 0.
How to choose optimum number of threads, optimum number of Redis and Cassandra instances etc. ?
With my current settings, I am running 2 EC2 instances of crawler each with 10 threads.
Most of the in-memory data is offloaded to Redis clusters or are written to Cassandra and very less are held in the memory of the EC2 instances (which have only 1 GB RAM each).
On doing a benchmark analysis, around 8 URLs are being fetched per second. This is with very proactive locking of shared resources. This comes out to around 700K URLs per day.
For e.g. if I want to crawl 1 billion URLs per day, then I would require around 2860 such EC2 instances. This depends on the use-case and application. For e.g. Google crawls 25–50 billion web pages per day and some webpages they crawl every day.
The codes are hosted in my Github account.