Cardinality Estimation with Big Data

Abhijit Mondal
12 min readJun 29, 2021

--

Source: quantamagazine.org

Along with membership queries, estimating the number of unique items are also one of the most common use cases in Big Data analytics

  1. Number of unique visitors on your app for the last 1 day, 1 week or 1 month.
  2. Number of unique users that signed up for your website.
  3. Number of unique products purchased through an e-commerce website in the last 1 week.
  4. Number of unique downloads of your app from the play store.
  5. Number of unique Twitter hashtags posted in the last 1 hour.
  6. Number of unique search queries, number of views on youtube videos, on your Quora answers etc.

…. and many more

For the remainder of the post we are going to estimate the number of unique visitors coming on our website.

Approach 1: COUNT(DISTINCT)

In the first approach we are going to use a Postgres database to store the stream of ip_addresses. Along with the ip_addresses we also store the timestamp.

We also index the columns ip_address and timestamp

CREATE INDEX ON unique_visitors((ip_address));

CREATE INDEX ON unique_visitors((timestamp));

To get the number of unique ip_addresses, we can use a simple SQL query as follows:

SELECT COUNT(DISTINCT(ip_address)) FROM unique_visitors;

To get the number of unique ip_addresses in the last 24 hours, we can use a SQL query as below:

SELECT COUNT(DISTINCT(ip_address)) FROM unique_visitors WHERE timestamp ≥ NOW()-INTERVAL ‘24 hours’;

With 10 million rows in the table, running the 1st query which is count of unique ip_addresses from the beginning of the app till date, it took 1 minute and 7 seconds for the query to complete while the 2nd query to fetch count in the last 24 hours was 487 ms.

On analysing the 1st query plan, we see that:

It was a plain sequential scan of all the rows in the table which is very inefficient.

For the 2nd query plan:

We can see that this one uses the timestamp index to scan only within the timestamp range.

It is possible to improve the query performance by avoiding nested count() query and using a sub-query to do the same operation.

SELECT count(*) FROM (SELECT DISTINCT ip_address FROM unique_visitors) sub;

The above query takes only around 2 seconds as compared to 1 minute 7 seconds in the count(distinct) variant.

We can see that this query actually leverages the ip_address index to optimize the query.

Instead of single DB instance, we can have a master and a replica.

There would be a service to insert the search queries into the master DB and another service that would consume the unique search queries in the last 24 hours from the replica DB every minute and publish the results on an analytics dashboard.

Pros

  • IP addresses are stored in a single data store and is thus consistent w.r.t. data store.
  • IP addresses are persistent and no danger of losing data in the event of system reboot.
  • Data is reusable w.r.t. changing time windows. For e.g. if today we want to count the unique visitors in the last 24 hours and there is another new requirement to count the unique visitors in the last 48 hours, then there is no additional requirement to restructure data only additional SQL queries are needed to fetch the new reports.
  • Approach is suitable in case where time windows are small and frequency of pull is less for e.g. fetch count of unique visitors in the last 1 hour after every 1 minute.

Cons

  • Billions of ip_addresses. Size of data would be in few TBs.
  • Not suitable where we need global unique counts for e.g. unique youtube video views or unique views on a Quora answer and so on as this requires full table scans.
  • Even if time windows are less but if data is large in these windows, it can take lots of time to get the results for e.g. the unique visitors count on Amazon.com can explode during Black Friday or Christmas holidays.

Optionally we can have a separate rollup table with only unique IP addresses for analytics workload by using triggers.

We can transfer the unique ip addresses first time to this table using the following SQL query:

INSERT INTO unique_ip_addresses(ip_address, timestamp) select ip_address, timestamp from unique_visitors WHERE timestamp IN (SELECT MAX(timestamp) FROM unique_visitors GROUP BY ip_address);

We are doing MAX(timestamp) because if there multiple entries with the same IP, we consider the last one inserted. Thus even with the trigger, if there are duplicates we update the timestamp to the latest observation. Note that this approach will only work for queries with suffix time windows i.e. last 1 hour, last 24 hours etc. and not prefix or arbitrary time windows.

Then finding the cardinality is just doing COUNT(*) on the unique_ip_addresses table.

It takes somewhere around 350 ms with about 1 million unique IP addresses.

We can add further optimisations using query caching on ‘unique_ip_addresses’ table.

Assuming that our queries are over 1 hour interval every 1 minute, if for the interval [a, b] the unique visitor count is X then for the next interval [a+1, b+1] after 1 minute, the unique visitor count would be X+Y-Z where

Y = unique visitor count between (b, b+1]

Z = unique visitor count between [a, a+1)

Since the intervals (b, b+1] and [a, a+1) are much smaller than [a, b], thus time taken to fetch these results will be much faster.

Observe that the above optimization would only work with ‘unique_ip_addresses’ table and not on the original table since there could be overlap in X and Y.

There is one obvious drawback with this rollup table, which is events which are closer in time may not be close together in the database table i.e. unlike the original table, the rows in this table are not sorted by timestamp as whenever we see a duplicate IP address, we update its timestamp and thus the ordering is not maintained anymore.

Approach 2: LinkedHashMap (LHM)

In another approach, we can use LinkedHashMap to store the IP addresses.

Whenever an IP address of an user is tracked, it is first checked whether it exists in the LinkedHashMap or not. If it exists, it is deleted and then re-inserted at the head else just inserted at the head.

This process ensures that the LinkedHashMap:

  1. Contains only unique IP addresses
  2. Maintains a FIFO ordering i.e. ordered by insertion timestamp from head to tail of LinkedHashMap.

In order to maintain the constraint that the LinkedHashMap contains only IP addresses added in the last 1 hour, delete IP addresses from the tail until the timestamp of insertion of IP address at the tail is greater than previous 1 hour from current timestamp.

We need to do the above operation each time we call the api to get the current size of the LinkedHashMap. We can optionally do the operation also at the time of adding new IP address.

Assuming that we want to count the number of unique visitors in the last 1 hour after every 1 minute interval, we can poll the LinkedHashMap after every 1 minute and if the data at the tail is older than 1 hour, we flush those data.

At this stage return the size of the LinkedHashMap.

I tried out an implementation of the above approach in Python:

We can simulate a stream of IP addresses using the below implementation:

  • Create a process to fetch the current size of the LHM and write it to a file.
  • Create 5 parallel processes to write the IP addresses into the LHM.
  • Both read and write processes require locks because during reading we are also removing ‘stale’ data from the tail of the LHM.

In real application LHM would be used in conjunction with a database because LHM only stores unique IP addresses for a given time window. If we want to cater more time windows, we can add multiple LHM objects.

Ideally there would be a message queue such as Kafka that would ingest the IP addresses and LHM would consume the data from one topic and database would be written to from another topic. LHM would be used purely for analytics workloads.

In distributed applications we can use Redis to implement LHM.

Optionally instead of deleting the tail of the LHM when it goes ‘stale’, we can flush those IP addresses into a database table for storing only unique IP addresses which we saw in approach 1. Thus we do not need the triggers in that case.

Pros

  • Queries are faster as compared to using only the DB because we are only storing the unique IP addresses and is being served directly from RAM.
  • We can have multiple LHM objects for different time windows.
  • No need to use database triggers as we can use the LHM in-place of the triggers.

Cons

  • Instead of a single database we are having the data about IP addresses at 2 different data sources, which might lead to inconsistency at some point. But it would most likely be an eventually consistent system.
  • To add additional time windows, we need to create separate LHM objects in memory which would consume lots of RAM.
  • Approach good for smaller time windows and frequent access patterns. If time windows are huge or we want cardinality over the entire duration, all the data would have to be held in-memory.
  • Cannot combine LHMs with smaller time windows to get unique counts for larger time windows because IP addresses might overlap.

Approach 3: HyperLogLog (HLL)

In most big data applications where there is a requirement for computing the cardinality the exact cardinality may not be required but sometimes we can live with a small error rate. For e.g. for a site like Facebook or Twitter where millions of users turn up every day and each user performs multiple actions, reporting the number of unique visitors with some 2–3% error rate will not drastically impact analytics.

The idea behind HLL is using probabilistic data structures to compute the cardinality with a very low memory footprint and fast in-memory queries.

Given a stream of integers, if we have seen n integers so far and each integer is equally likely, then approximately n/2 of them will end with ‘1’ in the binary representation, n/4 of them will end with ‘10’, n/8 of them will end with ‘100’ and so on.

Thus there are approximately n/2^(R+1) integers ending with R 0’s in its binary representation.

We can approximate the cardinality to be 2^(R+1) if the maximum 0’s at the end is R. For e.g. for 4-bit integers, if R=2 i.e. we might have observed the patterns of the form 0001, 0011, 0101, 0111, 1001, 1011, 1101, 1111, 0010, 0110, 1010, 1110, 0100 and 1100 but not 0000 and 1000 because the last 2 ends with more than 2 0’s.

But it is very much possible that we have only observed 0100 or 1100 and not the remaining patterns.

But assuming that all the patterns are equally likely, we most likely have seen all the above patterns and we approximate the cardinality as 2⁴=16.

Possible number of binary patterns for M-bit integers ending with R 0’s is 2^(M-(R+1)) where M=32. The probability of choosing ‘n’ integers from this set is:

P1 = (2^(M-(R+1)) chooses n)/(2^M chooses n)

This probability would be very close to the actual probability of seeing R 0’s which is :

P2 = G/N, where N is the total number of IP addresses seen including duplicates and G = counts[R] where counts[i] is the count of IP addresses seen with ‘i’ ending 0’s in binary.

We can then estimate ‘n’ from P1=P2.

In HLL, we estimate the cardinality to be n = constant*2^R.

For any data type we apply the MurmurHash3 hash function to obtain a 32-bit integer which is used for the cardinality estimation. For each integer seen, compute the ending 0’s in its binary and track the maximum ending 0’s. This approach is prone to error because as seen above the maximum 0’s seen could be near the beginning of the stream of data. In that case we would be overestimating heavily.

In order to decrease the error rate in estimation, generally multiple independent hash functions are used and then the maximum is averaged over.

n = constant*2^{1/m*(R1+R2+…+Rm)}

where Ri is the maximum 0’s seen at the end with the i-th hash function and there are m hash functions in total.

To optimize the calculation with m hash functions, instead only a single hash function is used but ‘m’ counters are tracked. The hash value is partitioned into a quotient and a remainder where the remainder is the last ‘p’ bits in the binary and quotient is the remaining 32-p bits.

Quotient = 32-bit hash/(2^p)

Remainder = 32-bit hash % (2^p)

Python code for computing HLL using averaging (uses some calculations from the original implementation):

In the above code we are keeping an array of size 2^p=1024 and each integer in the array is the range 0 to 22 (because we have kept last 10 bits out of 32 bits for the remainder), thus we need only 5 bits to store the quotient.

To store 10 million strings, the amount of memory required is 1024*5 bits = 0.6 KB only.

The error rate in estimation varies somewhere around 2–3%, which is closer to the empirical error rate of 1.03/sqrt(1024).

HyperLogLog is most suitable in cases where we want global cardinality i.e. cardinality from start of time.

In order to use HLL for time window based queries, we can create an array of HLLs indexed by the timestamp. Let the array of HLLs be denoted as A.

A[i] = (timestamp, HLL) where HLL is an implementation similar to above and ‘timestamp’ is the timestamp of the first element added to this HLL.

Although this is flexible but lets assume that every minute we create a new HLL in A, thus A[i+1].timestamp-A[i].timestamp ≤1 minute.

Now lets say that we want to find the number of unique visitors in the interval [a, b).

Then using binary search we find the indices i and j such that A[i].timestamp ≥ a and A[j].timestamp < b. Then we merge all the HLLs in this range as follows:

HLL[i:j] = [max(A[k].HLL[0] for k in i:j), max(A[k].HLL[1] for k in i:j), …, max(A[k].HLL[1023] for k in i:j)]

To find the count of unique visitors in the last 60 minutes, we can create 6 different arrays:

A — with HLLs for every 1 minute

B — with HLLs for every 2 minutes

C— with HLLs for every 4 minutes

D— with HLLs for every 8 minutes

E— with HLLs for every 16 minutes

F— with HLLs for every 32 minutes

Then assuming current time is t.

x1 = F[t-32, t), x2= E[t-48, t-32), x3= D[t-56, t-48), x4 = C[t-60, t-56)

Then merge x1, x2, x3 and x4.

We can also find A[t-60, t) then there will be 60 HLLs to merge as compared to only 4 HLLs in the above case.

Pros

  • With very low amount of memory (about 1–2 KB) we can approximate cardinalities upto 1 billion.
  • For distributed applications, we can have one instance of HLL running in each server instance behind load balancer. For the final result, we can merge the counters from each instance and take their mean. This should be really fast as we are running these HLL in parallel and no locking is required.
  • Unlike LHM and database approaches, we can merge multiple HLLs to get cardinality over wider time windows.
  • In-memory computation is really fast as compared to database queries.

Cons

  • Cardinality estimations generally have an error of around 2–3%.
  • Not applicable for applications with low cardinalities such as number of unique visitors on your blog or when you are launching your app/website.

References:

  1. https://engineering.fb.com/2018/12/13/data-infrastructure/hyperloglog/
  2. http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
  3. https://thoughtbot.com/blog/hyperloglogs-in-redis
  4. https://riak.com/posts/technical/what-in-the-hell-is-hyperloglog/index.html?p=13169.html
  5. https://www.citusdata.com/blog/2017/06/30/efficient-rollup-with-hyperloglog-on-postgres/

--

--

No responses yet