ML System Design — Fare estimation

Abhijit Mondal
10 min readMar 19, 2022

--

source: quanta magazine

A cab service like Uber/Lyft is launching its operation in a new city. Design a system to predict the base fares for different source and destination pairs within the city.

Type of ML Problem

It is a linear regression problem as we are predicting fare estimates which are real numbers.

What are the features ?

Following are some features that could be useful for predicting the base fares:

  • Distance from source to destination
  • Average time taken to travel from source to destination
  • Average number of people travelling from source to destination in a day
  • Average number of public transports from source to destination
  • Average fare per person in public transports between source and destination
  • Country where the city belongs
  • Number of cabs (already running or planning to be run) along the route
  • Type of ride (shared vs. non-shared)
  • Price of fuel in the city.

… and so on.

How to get training and testing data, how to get labels ?

Store the completed trips data into a big data databases such as Hadoop or Hive with following trip metadata:

(ride_id, user_id, driver_id, src_lat, src_long, dst_lat, dst_long, city, country, dist, duration, ride_type, base_fare, surge_multiplier, total_fare)

Since it is not really a good idea to group the trips by latitude and longitude, we can group nearby pickup and drop locations in the map using Quad Tree.

We can assume that a QuadTree cell has a maximum of K pickup/drop locations i.e. we do not divide a cell further if it contains less than equal to K pickup/drop locations else divide the cell it into 4 quadrants.

In this way we can assign unique ids to each QuadTree cell which will be used to group by based on source and destination for all trips.

quad tree

For e.g. if we have a table (latitude, longitude, grid_id)

Then we can run the following query to get the average distance between a source and a destination grid.

SELECT
(SELECT grid_id FROM grid WHERE latitude=src_lat AND longitude=src_long) as source,
(SELECT grid_id FROM grid WHERE latitude=dst_lat AND longitude=dst_long) as destination,AVG(dist) FROM trips WHERE city IN (<nearby cities>) GROUP BY source, destination

Similarly we can find the average time taken per source and destination , number of trips made between source and destination.

Similarly we can get the average base fare between a source and destination grid.

SELECT
(SELECT grid_id FROM grid WHERE latitude=src_lat AND longitude=src_long) as source,
(SELECT grid_id FROM grid WHERE latitude=dst_lat AND longitude=dst_long) as destination,AVG(base_fare) as base_fare FROM trips WHERE city IN (<nearby cities>) GROUP BY source, destination

We can create Spark jobs to run these queries on Hive.

We can get the public transport data from government websites.

Note that the public transport data obtained will most likely be during the same time when the cab service was already operational in the city but our goal is to estimate the public transport data before operationalizing in a city. Thus the numbers for public transports and people travelling by public transport would be lower than actual. We can multiply by some factor in the range 1.1–1.3 i.e. 10% to 30% less than actual.

What feature pre-processing steps are required ?

  • Handling correlated features : Note that distance and duration of trip maybe correlated assuming similar traffic conditions but in general this is not so since traffic conditions are very dynamic and depends on external factors. We can use PCA to merge correlated features.
  • Handling missing values and NaNs : Omit all trips where base_fare is missing. Missing distances can be calculated using lat and long. Missing duration can be computed by doing distance/average speed.
  • Outlier detection : Certain trips might take very long time due to traffic conditions or distance is a high number due to driver taking a detour. We can omit trips where time taken or distance lies outside mean + 2*sd value among all trips for the same source and destination pairs.
  • Feature scaling : The values for distances or duration are not of the same scale. We can normalize the feature values between 0 and 1 by doing (x-min(x))/(max(x)-min(x)) where x is the feature value and min(x) is the minimum value of the feature across trips.

How to compute the feature representations ? Handle high cardinality features ?

For numerical features (distance, duration etc.) we can keep the values as it is (after doing feature scaling) or we can do quantile bucketing.

In quantile bucketing the buckets are not fixed size but the number of trips in a bucket are fixed. For e.g. with distance feature, sort the distances first.

D1 ≤ D2 ≤ D3 …. ≤ Dn

Then if the bucket size is K, the first K i.e. D1 to Dk goes into bucket 1, then Dk+1 to D2k goes into bucket 2 and so on. Thus instead of n feature values we have n/K feature values:

D’1, D’2 , … D’n/K

where D’1 = (D1+…+Dk)/K and so on.

For categorical features (such as country and type of ride), we can do one hot encoding. Since number of countries could be around 200, the vectors could be really sparse.

One strategy is to do bucketing. Compute the frequency of trips per country.

Let the country and frequencies of trips sorted in descending order be:

(C1, F1) (C2, F2) … (Cn, Fn) where F1 ≥ F2 ≥ …

Then starting from the 1st country add up the frequencies until the ratio of (F1+F2+…+Fk)/(F1+F2+…+Fn) ≥ 0.9 where k < n.

Assign the labels 1 to C1, 2 to C2, …, k to Ck and k+1 to all countries from Ck+1 to Cn. That we have only k countries instead of n.

Another strategy is to use Hashing technique.

Generate K random permutations of 0.to.m-1 where ‘m’ is the number of countries. For each permutation, there will be one index where the value is 1 and all remaining indices are 0 (since its a one-hot encoding). The new value is the value of the permutation entry for the non-zero index.

Hashing 9 length one-hot encoding vector to 3 length hash.

Concatenate the numerical as well as the one-hot encoding vectors for each trip.

Where to store the feature representations ?

The features could be stored in a database for persistence. We can use NoSQL database such as Cassandra to store the feature values.

Feature table: (trip_id, feature_id, feature_name, value)

For real time inferencing we should also use some in-memory database such as Redis to store the feature values.

How to train the model using the features ?

We can train a linear regression model using Spark which reads the training data from Hive and the features from Cassandra.

Following are the steps to train the model:

  • Split the dataset into training, testing and validation (usually 80–20)
  • Do the hyperparameter tuning (regularization constants etc.) using grid search cross validation (K-fold).
  • We can use the RMSE loss function to train the model as well as optimize the grid search results. RMSE = sum of (true fare — predicted fare)² over all training data.
  • The best hyperparameters are chosen based on minimum loss on the validation data.
  • Final model is trained on the entire training data with the best hyperparameters.

Assuming N training examples and M features.

L = sum_i(1toN) (true_i — pred_i)² + reg*sum_i(1toM) w_i²pred_i = sum_i(1toM) w_i*feature_i

How to evaluate the model offline ? Metrics ?

RMSE loss

How to save the model and model weights, architecture etc. ?

We can pickle the model weights, the model class object, feature transformers etc. and upload them in an S3 bucket.

<bucket_name>/<date>/<version>

How to train when the training data size is too huge for a single machine ?

There are multiple ways by which this can be achieved.

Python Generators

Instead of loading all training data in memory, load batches of say 100 at a time using python generators. Once a batch is processed delete it from memory. Disadvantage is that there would be lots of disk I/O which might increase the training time.

S3 Pipe Mode

Upload the training data in S3 instead of Hive and then use Pipe mode during training with Sagemaker. Data is directly streamed to the training algorithm instead of saving it on local disk and then loading in memory. Here we have network I/O which might slow down training.

Distributed Training (Parameter Sharing)

  • Split the training dataset across multiple nodes/machines.
  • Copy the testing dataset in all the machines.
  • Create a table in Cassandra to track the weights learned. (model_id, weights, loss, last_updated_timestamp)
  • Create another table to track the cross validation runs. (model_id, machine_id, fold_num, hyperparameter_hash, hyperparameters, validation_loss)
  • In each machine, divide the training set into K equal parts for K fold CV.
  • Before running the hyperpaparmeter CV tuning job, update the hyperparameters table with the model_id, machine_id, fold_num, hyperparameters data. The hyperparameters is JSON string and is same for all fold_num coresponding to a model_id.
  • After finding the validation loss for a hyperparameter setting update the validation_loss column corresponding to model_id, machine_id, fold_num and hyperparameters in Cassandra.
  • To find the best hyperparameters for a model, do
SELECT hyperparameters 
FROM
(SELECT model_id, hyperparameters, AVG(validation_loss) as loss GROUP BY model_id, hyperparameter_hash)
HAVING loss=MIN(loss)
  • Before each epoch, fetch the ‘weights’ from the weights table and run gradient descent on these set of weights.
  • After finding the updated weights and the loss on the training data in one epoch, send the updated weights with the loss to Cassandra. If the timestamp is greater than the last_updated_timestamp then the weights and loss are overwritten in the weights table else they are ignored.

How to deploy the inferencing codes into production ?

  • Create Flask endpoints for inferencing.
  • In the constructor, download the model artifacts from S3 bucket above and initialize the model class and feature transformer classes.
  • To make Flask multithreaded, use Gunicorn on top of Flask.
  • Use Github+Jenkins to build a CI/CD Pipeline.
  • Whenever a PR for the inferencing codes is created in Github, trigger Jenkins build i.e. run any unit and integration test cases and if all test cases passes, then build docker image and deploy the docker image in a Kubernetes cluster.
  • Use at-least 3 replicas for the pod running the docker service to allow for load balancing.

How to monitor the models in production ?

Setup logging and observability with Datadog or Cloudwatch (if using AWS).

Some performance metrics we can track are:

  • Number of 5xx errors in the last 5 minutes.
  • P99 latency every 5 minutes.
  • Number of requests per second.
  • CPU load across different nodes running the inferencing service.
  • Memory usage across different nodes running the inferencing service.
  • Number of Exceptions in the last 5 minutes.

Model metrics we should track:

  • Data drift — For each feature pre-compute the mean and variance in the training dataset. In production compute the mean and variance of feature values in streaming way and after every 5 minutes, compute the KL Divergence between the training and production values.

How to do online evaluation ?

Using A/B Testing with different online metrics such as: number of bookings, number of cancellations after price is shown on app, number of uninstallations of app, etc.

We can use chi-squared measure or Z-test to validate whether our model is actually working or not.

One strategy is to compare the ML model against a simple heuristic such as average of the base_fares of nearby cities. For each source and destination pair find the distance then find the closest matching distance among all the nearby cities and then take those base_fares and average over all cities. This is the null hypothesis.

Divide the set of users in a city into 2 equal groups, one group is getting base_fares with null hypothesis (model0) and another group with the above ML model (model1).

Z-test:

For the next 30 days compute the number of booking each day from both models. Let u0 be the mean number of bookings from model0 and u1 from model1. Similarly let v0 be the variance from model0 and v1 from model1.

Z score = (u1-u0)/(sqrt((v0+v1)/30))

If the Z-score is much higher than 0, then our model1 is outperforming model0. To get the confidence we can find the p-value.

But the drawback here is that it assumes normal distribution of bookings which may not be true.

It is not possible to do chi-square evaluation on number of bookings because we do not know why user has not booked a ride. We can modify the metric a bit for example, instead of number of bookings, we can use number of unique users who made bookings in last 30 days.

Thus we would have 4 different counts:

  • Number of users who booked after seeing price from model0
  • Number of users who did not book after seeing price from model0
  • Number of users who booked after seeing price from model1
  • Number of users who did not book after seeing price from model1

Number of users who did not book = Number of users who have app installed-Number of users who booked.

We can compute chi-square score using the above 4 counts.

Pipeline

Training

Inference

--

--

Responses (1)