System Design — Experimentation Platform (XP)

Functional Requirements

  • Users of the platform must be able to create, update and delete experiments and configurations.
  • Users of the platform must be able to define their own metrics.
  • Users of the platform must be able to define the variants as well as number of variants for each experiment.
  • Users of the platform must be able to define allocation of resources for each experiment.
  • Users of the platform must be able to define the tests for comparing variants in an experiment.

Non Functional Requirements

  • Durability — Experiments once created should not get lost or corrupted such as the data, metrics, tests and the configurations for the experiment.
  • Low Latency — Clients should not be aware or affected due to experimentations.
  • Availability— System should be highly available.

Traffic and Throughput Requirements

  • Average QPS = 1 million
  • Number of experiments running at any instant = 1000
  • Average number of variants per experiment = 2
  • Total number of clients = 1 billion

The “On My Computer” Approach

What are the essential steps for setting up xp platform ?

  1. Request Routing — Based on the request parameters and environment variables such as user id, country, post id etc. an API request is routed to at-most one variant for each experiment associated with an application. For e.g. if the application is “Friend Suggestions”, then different experiments could be “Number of suggestions”, “Size of images”, “Horizontal or vertical scrolling”, “Recommendation algorithm” etc. Based on user id, we can direct user id ‘ABC123’ to say number of suggestions=10, size of image=160x100 etc.
  2. Response Tracking — Based on what actions are taken by clients, we can track a suitable response against an experiment in each application. For e.g. user id ‘ABC123’ clicked 3 recommendations.
  3. Running Analysis — Run analysis on the results of the experiments to compare each variant of an experiment and finalize which variant should or should not be productionized for each experiment.
  4. Productionizing — After making decision on which variant to productionize or should it be productionized, the last step is to make that variant for an experiment available to all users instead of a random set of users.

How to setup request routing ?

  • For an experiment X, deploy the variants V1, V2, … Vn etc. Lets assume the variants are deployed as micro-services in a distributed environment.
  • Create a table to map experiments and variants against an URL and a port number. Add a boolean column to identify whether this is the NULL hypothesis or not.
xp_variant_mappings: 
experiment_id, variant_id, ip_address, port, url, is_null_hypothesis
  • Create another table to store experiment configs.
xp_configs: 
experiment_id, config_name, config_value

Configs could be as follows:

1. alpha
2. power
3. minimum_detectable_effect
4. minimum_sample_size
  • In order to store filters for an experiment for e.g. when we want to run an experiment specifically for ‘US’ based users. One way is to create a table with the following fields:
xp_filters: 
experiment_id, db_name, table_name, attribute, value

For e.g. if we want add filter on Country=”US”, from the “users” table under DB “social_data”

INSERT INTO xp_filters (experiment_id, db_name, table_name, attribute, value) VALUES ('ABC123', 'social_data', 'users', 'Country', 'US')

Another approach would be defining raw SQL queries against an experiment id to fetch the data. We can either store the SQL query as a string in the database or write it in an .sql file <experiment_id>-filter.sql.

'ABC123-filter'.sql"SELECT id FROM users 
WHERE users.Country='US' AND
users.id IN
((SELECT src_user_id FROM friends GROUP BY src_user_id HAVING COUNT(src_user_id) >= 100) UNION (SELECT author_id FROM posts GROUP BY author_id HAVING COUNT(author_id) >= 100))"
"SELECT id FROM posts
WHERE posts.id IN
(SELECT post_id FROM likes GROUP BY post_id HAVING COUNT(post_id) >= 5)"
... and so on.

HashMap A: “users_<expt-id>”: [list of enabled user_ids]

HashMap B: “posts_<expt-id>”: [list of enabled post_ids]

… and so on.

  • In order to define the split criteria for each experiment, we can define a method in a script file that will take the request parameters and env variables and will return a split identifier.

For e.g. the split criteria is ‘user_id’ and ‘Country’.

'ABC123-split'.pydao = DBObject()def get_split_id(request, environment):
country = dao.get_user_country(request.get("user_id"))
return hash(request.get("user_id") + country)

Instead of making frequent DB requests to fetch “country” using the user_id, we can cache the country results in a LRU Cache.

  • Lastly we need a way to assign each row/tuple from the above split result to at-most one variant for each experiment. One possible way is to create a table mapping the split_id to the variant id.
xp_resource_mapping:
split_id, variant_id, experiment_id

Since the idea is to have the mapping consistent i.e. once a split_id is assigned to a variant for a session, then for all future sessions the same split_id will be assigned to the same variant, we can also use an on-the-fly hash based assignment.

1. Generate a uniform random number R between 0 and 1.
2. If R <= 0.7 assign current split_id to variant 0 else assign it to variant 1.

For more generic scenarios:

class Assignment:
def __init__(self, proportions):
self.cumulative_frq = []
sums = 0
for x in proportions:
sums += x
self.cumulative_frq += [sums]
def assign(self, split_id):
r = random.random(0,1)
#Can use binary search instead of linear search
for i in range(len(self.cumulative_frq)):
f = self.cumulative_frq[i]
if r <= f:
return i
  • In order to track the requests, we should log the request and the request parameters in append only log files. Some fields that we can track in the log file are:
    experiment_id, variant_id, user_id, page_id, post_id, last_accessed_timestamp etc.
    This would be useful in order to track whether a user has seen a recommendation or how long ago the user was shown the recommendation or if the user is being shown the recommendation for multiple times and so on

How to do response tracking ?

  • After a user takes an action, it is inserted into a Kafka topic for asynchronous processing.
  • Kafka consumers will read the action messages from this topic and write each action into an append only log file.
  • The log files would be sharded by <experiment_id>-<date>-<hour>-<minute>-<second>. i.e. for each second we would have one log file.
  • Message format would include fields such as variant_id, action (click, view, scroll, etc.), timestamp, etc. It could be a CSV format. The message format and fields would be defined by the experimenter.
  • In order to handle CDC (Change Data Capture), we must maintain a table to track till which line number in each log file has been processed by the cronjob. Further processing will happen after that line number.
cdc_log_files:
experiment_id, file_name, processed_line_number

If we encounter an error at a particular line number, we do not skip it since the log is a proof of the event stream and skipping a line would imply missing a step. Thus we can enable retry with exponential backoff in case of failure and if still not resolved, then manually resolve them.

  • For each experiment, create a shell script that will likely do the following steps:
./ABC123_analytics.sh <log_file_name> <start_line_number>1. Read <log_file_name> starting from <start_line_number>
2. For each line, do preprocessing steps such as convert CSV to array etc.
3. Aggregate the results over multiple lines if required. For e.g. CTR for each day or number of clicks for each user id.
4. Write/Update the results in another Postgres database.
5. Update the processed_line_number for <log_file_name> for next iteration.
  • Create background jobs using schedulers such as Airflow or cronjob, which would periodically execute the shell scripts.

How to run the analysis ?

There could be different business metrics for different experiments, for e.g.:

  • For Feed Ranking we can use Mean Average CTR. For e.g. if the user interacted with the 1st, 3rd and 7th feed item out of 10 items then score is 1/10*(1/1+2/3+3/7)=0.21, whereas if he had interacted with 1st, 2nd and 3rd items, we would have = 1/10*(1/1+2/2+3/3)=0.3.
  • For Friend Suggestions we can use Average CTR. For e.g. if the user clicked 3 recommendations out of 10 recommendations then score is 0.3, whereas if he had clicked 5 recommendations the score would have been 0.5.
  • For Signup form, we can track the conversion rate. If an user signed up then assign a label 1 else 0. Then find the ratio of number users with label 1 to the number users shown the form.

Also depending on the type of metrics, we need to use different testing strategies, for e.g.

  • If the metric is CTR per day then we can use Z-test to compare multiple variants using the mean and standard deviation of the CTR.
  • If the metric is whether a user converted or not, then we can use Chi-Square test to compare the variants.
  • There are other testing strategies for comparing variants such as Student’s T-test, Kolmogorov Smirnov Test, Bootstrapping method, Delta method etc.

To run the tests, we create another script that will read the data from the Postgres DB and do the calculations:

./ABC123_tests.py1. Make connection with Postgres DB
2. Read all rows from the DB table for experiment_id
3. Group by variant_id.
4. For each group run either Z-test or Chi-Square test and return the p-value.

How long to run the experiments ? When to stop an experiment ?

Required sample size increases with lower alpha, higher power and lower minimum detectable effect.

What are the some failure scenarios and how to handle them ?

  • Incorrect values for alpha, power or MDE could lead to incorrect sample sizes such as very low or very high. In such a case we define a minimum and maximum range for sample size or let the experiment run for 15–30 days before stopping.
  • Failure during request routing such as cannot fetch variant id, then do not route the request through the experiment and instead treat it as a normal request.
  • Failed to update log with response action. Retry with exponential backoff.
  • Failed to compute the metrics or update Postgres DB. Do not update processed_line_number for the log file, thus next time the cronjob will retry the failed lines from the log file.

What problems can arise with a single machine system ?

  • Fault Tolerance and Availability — With a single server for database, caching, logging and metric analysis. If the server goes down the entire system becomes unavailable.
  • Scalability — Assuming that request routing on average takes 10ms, then number of QPS an 8 core CPU can serve is 8*1000/10 = 800, but our system has 1 million peak QPS.
  • Durability — Using in-memory HashMaps for caching configs, enabled users etc. has a drawback that if the server shuts down, then all data is lost.
  • Since we are tracking the requests and response of each request in log files, assuming that each line of log is approximately 100 bytes then with 1 million QPS, the size of log data for 1 year is approximately = 10⁶*100*365*24*3600 bytes = 3 PB.

Distributed XP

To enable fault tolerance we will also store 3 replicas across different region for each log file in S3.

Approximate size of 1 file= 10⁶*100 bytes = 95 MB.

Approximate number of log files for 1 year including 3 replicas = 3*1024*1024*1024*3/95 = 100 million

Assuming 1 billion users, number of bits required for users = log2(1 billion) = 30.

Number of bits required for 1000 experiments = 10 bits

Using Postgres, our table would have the following structure:

enabled_users:
experiment_id, user_id

Total size of table for storing enabled user_ids for all experiments = 40bits*1 billion*1000 experiments = 4.5 TB.

To query if an user_id is enabled:

SELECT COUNT(*) as num_rows FROM enabled_users WHERE user_id=<user_id>;

To account for fault tolerance, we would be using 3 replicas for each partition. Thus we can have 17 partitions with each partition having 3 replicas. Size of each partition = 4.5 TB/17 = 271 GB.

partition_id = hash(experiment_id) % 17

What kind of replication strategy should we use ?

For the log files, we would be ok with eventual consistency because:

  1. Log files are append only and thus we cannot have inconsistent state unless writes at a replica happen out-of-order i.e. a message at earlier timestamp is not written or written after a message at a later timestamp. Using Paxos or Raft like protocol we can ensure that messages are not OOO at any replica.
  2. Scripts reading from the log files will first lookup the “processed_line_number” from DB.

If maximum line number ≥ processed_line_number, then we are sure we are reading the correct sequence of data. Although it may be that some replica is more recent and has more lines as compared to another replica but unless maximum line number < processed_line_number we are ok to read from a replica as it is assumed to be correct.

For Postgres, we need strong consistency because if we add or remove an userid from enabled list, then it should be available to any future routing request service calls else we could have corrupted data for analysis.

In synchronous replication, writes go through master i.e. writes are successful only if master and both replicas are successfully updated. Reads happen from any of the 2 read replicas behind a load balancer.

Resources

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store