source: quanta magazine

Building a multi-client chat server with select and epoll

Abhijit Mondal

--

Chat sessions are maintained by the chat server which coordinates between multiple user sessions like managing user identities, accepting messages from sender, forwarding messages to receipient, tracking user login/logout info etc.

While a full-fledged chat server requires database connections to persist user and session informations. This could be a simple file or a distributed database like Cassandra or Redis. In this post we will forego the database connectivity part and focus on one of the most important aspect i.e.

How to scale the chat server to millions of active user sessions.

In order to ensure that the chat server is not blocked on a particular client socket, the first thing that comes to mind is using Threads.

Using multiple threads ensure that the chat server is actively accepting and forwarding messages from and to multiple clients concurrently without being blocked on any particular socket.

But there are few problems:

  1. Creating millions of threads on a chat server is probably a bad idea and most likely this will throw some resource error. Although we can have say 1000 chat servers each running 1000 threads so that it is effectively serving a million sessions at any time.
  2. Threads are not cheap. Creating 1000 threads whenever the server crashes or restarts is expensive. Moreover the OS needs to handle context switching among threads. Even with 1000 threads, all 1000 will not be working in parallel. On a 8-core CPU, only 8 threads and/or processes can run in parallel. For the rest, context switching needs to happen when some thread is idle or has been killed.
  3. When forwarding messages to clients over a socket, if multiple clients are sending messages to a single client, ultimately there are multiple threads writing to the same socket file descriptor. This could lead to out-of-order messages or worse jumbled up messages without locking the socket file descriptors. If we use locks on a socket, then there could be resource starvation, where other clients are waiting to send message.

To overcome the challenges with Threads, a more optimized solution is the select() functionality.

select(FD_SETSIZE, &read_fds, &write_fds, &error_fds, NULL);

select() is used to monitor which socket file descriptors are ready for read and write. Thus the main thread do not have to wait for data to be present on a socket. Only those sockets which are ready will be notified.

The select method accepts 5 arguments:

  1. 1st argument is the number of file descriptors which will be monitored by select. The upper limit to this is 1024 with select. Thus at any time, select can monitor 1024 client sockets. FD_SETSIZE = 1024.
  2. 2nd argument is an integer denoting which sockets are ready to be read. For e.g. if the value is 5, then its binary rep 101, which implies that only the 1st and 3rd socket file descriptors are ready to be read.
  3. 3rd argument is an integer denoting which sockets are ready for sending message. For e.g. if the value is 11, then its binary rep is 1011, which implies that the 1st, 2nd and 4th socket file descriptors are ready to for sending messages.
  4. 4th argument is an integer denoting which sockets have thrown errors.
  5. The 5th and last argument is the timeout. If the socket is set to non-blocking i.e. will not wait indefinitely for a message from client, then if the timeout value is set to say 5, the socket will return after 5 seconds if it did not receive any message.

Note that the 2nd, 3rd and 4th sockets are mutually exclusive i.e. same socket file descriptor will not be present in both read_fds and write_fds.

Timeout only works if socket is set to non-blocking mode.

Let’s create the file server.cpp

The 1st step is to create the socket that the server is going to listen from all its clients.

int create_tcp_server_socket() {
const int opt = 1;

/* Step1: create a TCP socket */
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd == -1) {
perror("Could not create socket");
exit(EXIT_FAILURE);
}

printf("Created a socket with fd: %d\n", fd);

/* Step2: set socket options */
if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
(char *)&opt, sizeof(opt)) == -1) {

perror("Could not set socket options");
close_socket(fd);
exit(EXIT_FAILURE);
}

/* Initialize the socket address structure */
/* Listen on port 5001 */
struct sockaddr_in saddr;

saddr.sin_family = AF_INET;
saddr.sin_port = htons(5001);
saddr.sin_addr.s_addr = INADDR_ANY;

/* Step3: bind the socket to port 5001 */
if (bind(fd, (struct sockaddr *)&saddr,
sizeof(struct sockaddr_in)) == -1) {

perror("Could not bind to socket");
close_socket(fd);
exit(EXIT_FAILURE);
}

/* Step4: listen for incoming connections */
/*
The socket will allow a maximum of 1000 clients to be queued before
refusing connection requests.
*/
if (listen(fd, 1000) == -1) {
perror("Could not listen on socket");
close_socket(fd);
exit(EXIT_FAILURE);
}

return fd;
}

Next step would be to initialize the server socket to listen to incoming clients and set it to non-blocking:

/* Get the socket server fd */
int server_fd = create_tcp_server_socket();

/* Make the socket non blocking, will not wait for connection
indefinitely */
fcntl(server_fd, F_SETFL, O_NONBLOCK);

if (server_fd == -1) {
perror ("Could not create socket");
exit(EXIT_FAILURE);
}

The socket needs to be set to non-blocking, so that the server does not get blocked on send and recv commands from one client while other clients are waiting for new connection.

Next step would be to run an infinite loop to detect all sockets having pending requests or data in it using select:

std::vector<int> all_connections;
all_connections.push_back(server_fd);

while (1) {
/* Clean up all file descriptor set bit flags */
FD_ZERO(&read_fd_set);

int ret_val = select(FD_SETSIZE, &read_fd_set, NULL,
NULL, NULL);

if (ret_val != -1) {
// Some socket has data or connection request
for (int i=0; i < all_connections.size(); i++) {
if ((all_connections[i] > 0) &&
(FD_ISSET(all_connections[i], &read_fd_set))) {

if (i == 0) {
// This is a new client request
int client =
accept_new_connection_request(server_fd);

all_connections.push_back(client);
}
else {
// This is data sent by an existing client
recv_and_forward_message(all_connections[i]);
}
}
}
}
}

all_connections is an array to store the file descriptors for the clients. These would be used to send and forward messages.

During each iteration, read_fd_set will have some bits set. Before the start of the next iteration, if we do not clear these bits, then we would mistakenly think that some socket has data even if no data or connection event did not happen.

Because select only sets those bits which has an event.

The method to accept new connection requests:

void accept_new_connection_request(int fd) {
struct sockaddr_in new_addr;
int addrlen = sizeof(struct sockaddr_in);

int client = accept(server_fd, (struct sockaddr*)&new_addr,
(socklen_t*)&addrlen);

if (client != -1) {
/* Set the new socket to non-blocking */
fcntl(client, F_SETFL, O_NONBLOCK);
return client;
}

else {
/* We have processed all incoming connections. */
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
break;
}
else {
perror("Could not accept connection");
return -1;
}
}

Method to recv message from clients and then forward.

Since the client can send any size of data as messages within some restrictions it is a good idea to read the message in chunks of small size over multiple iterations.

The chunk size is DATA_BUFFER in this case.

The protocol is such that each message sent from client or server will be separated by the identifier “<EOM>”. This will allow the server or the client to split multiple messages as well as join incomplete messages.

void recv_and_forward_message(int fd) {
std::string remainder = "";

while (1) {
char buf[DATA_BUFFER];
int ret_data = recv(fd, buf, DATA_BUFFER, 0);

if (ret_data > 0) {
/* Read ret_data number of bytes from buf */
std::string msg(buf, buf + ret_data);
msg = remainder + msg;

/* Parse and split incoming bytes into individual messages */
std::vector<std::string> parts = split(msg, "<EOM>");
remainder = msg;

forward_message(parts);
}
else {
/* Stopped sending new data */
break;
}
}
}

The split functionality is quite easily implemented in C++:

std::vector<std::string> split(std::string &s, std::string delimiter) {
size_t pos = 0;
std::vector<std::string> parts;

while ((pos = s.find(delimiter)) != std::string::npos) {
std::string token = s.substr(0, pos);
if (token.size() > 0) parts.push_back(token);
s.erase(0, pos + delimiter.length());
}

return parts;
}

The above chat server works as follows:

Step 1: Create the TCP socket for listening to all incoming client connection requests.

Step 2: Use select to identify which all sockets have incoming request or data. If the i-th socket has request or data, then the i-th bit in read_fd_set is set to 1. All remaining bits are 0.

Step 3: If the server socket is set i.e. there is a new incoming request, accept the connection and add the new client socket to all_connections.

Step 4: If another socket has data i.e. an existing client has sent data. Then loop through all connections to identify which bits are set and then receive the data from that client socket.

Note that there are multiple ways by which one can implement the outer while (True) loop.

Instead of looping infinitely over each socket for which there is data until all the data has been read from one socket, one can also read the 1st chunk of the data from ecah socket having data, then in the next iteration, read the 2nd chunk from each socket and so on.

This approach is more favorable in scenarios where the clients are sending messages too frequently, in which case the implemented approach will only receive messages from one client until that client pauses. But the suggested approach will give fair share to each client.

The drawbacks with the select approach are as follows:

  1. ‘select’ can only monitor sockets with file descriptors from 1 to 1024.
  2. ‘select’ does not return only the sockets which have data. It returns a data structure with all sockets and the sockets for which there is data, the corresponding position bit will be set. Thus we need to loop over all 1024 sockets to identify which bits are set.

Another approach is using the epoll() method:

In the epoll method, only sockets in which data is present, are returned at every invocation of epoll and hence much more efficient than select().

Our 1st step as before would be to initialize the server socket to listen to incoming clients:

/* Get the socket server fd */
int server_fd = create_tcp_server_socket();

/* Make the socket non blocking, will not wait for connection
indefinitely */
fcntl(server_fd, F_SETFL, O_NONBLOCK);

if (server_fd == -1) {
perror ("Could not create socket");
exit(EXIT_FAILURE);
}

The next step would be to create and initialize the epoll instance:

struct epoll_event ev, events[MAX_EVENTS];

/* Create epoll instance */
int efd = epoll_create1 (0);

if (efd == -1) {
perror ("epoll_create");
exit(EXIT_FAILURE);
}

ev.data.fd = server_fd;

/* Interested in read's events using edge triggered mode */
ev.events = EPOLLIN | EPOLLET;

/* Allow epoll to monitor the server_fd socket */
if (epoll_ctl (efd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
perror ("epoll_ctl");
exit(EXIT_FAILURE);
}

MAX_EVENTS is the maximum number of events we want to monitor with epoll.

For each epoll event we need to define the socket which we want to monitor with epoll and the type of event on that socket i.e. read and write etc.

For the type of event, for read events we would use EPOLLIN flag and for write events we would use EPOLLOUT flag. There are 2 different modes available with epoll for monitoring: Level Triggered and Edge Triggered.

In Level triggered mode, epoll will return an event if there is data still present for that event that needs to be read whereas with Edge Triggered mode, epoll will only return an event whenever data is detected 1st time.

Thus with Edge Triggered mode, if the data is not read off from the socket associated with that event before the next epoll call, then in the next epoll call, that event will not be returned.

To enable Edge Triggered mode, we use another flag EPOLLET.

Lastly, we would want to add our event to the epoll queue. Without adding the event, it will not be monitored.

Next step would be to run the infinite loop to detect all “interesting” events with epoll and then decide what to do with them:

while (1) {
/* Returns only sockets for which there are events */
int nfds = epoll_wait(efd, events, MAX_EVENTS, -1);

if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}

/* Iterate over sockets only having events */
for (int i = 0; i < nfds; i++) {
int fd = events[i].data.fd;

if (fd == server_fd) {
/* New connection request received */
accept_new_connection_request(fd);
}

else if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {

/* Client connection closed */
close(fd);
}

else {
/* Received data on an existing client socket */
recv_and_forward_message(fd);
}
}
}

Accepting new client connection request is as follows:

void accept_new_connection_request(int fd) {
struct sockaddr_in new_addr;
int addrlen = sizeof(struct sockaddr_in);

while (1) {
/* Accept new connections */
int conn_sock = accept(server_fd, (struct sockaddr*)&new_addr,
(socklen_t*)&addrlen);

if (conn_sock == -1) {
/* We have processed all incoming connections. */
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
break;
}
else {
perror ("accept");
break;
}
}

/* Make the new connection non blocking */
fcntl(conn_sock, F_SETFL, O_NONBLOCK);

/* Monitor new connection for read events in edge triggered mode */
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;

/* Allow epoll to monitor new connection */
if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) {
perror("epoll_ctl: conn_sock");
break;
}
}
}

The method for receiving and forwarding message from and to clients is same as the one we saw for select.

For completeness, here is simple client implementation for the server code, which accepts user input from STDIN and a background thread running that detects incoming messages from the server:

client.cpp:

#define MAX_LIMIT 4096

void recv_messages(int server_fd) {
int ret_data;
std::string remainder = "";

while (1) {
char buf[MAX_LIMIT];
ret_data = recv(server_fd, buf, MAX_LIMIT, 0);

if (ret_data > 0) {
std::string msg(buf, buf+ret_data);
msg = remainder + msg;
std::vector<std::string> parts = split(msg, "<EOM>");
remainder = msg;

for (int i = 0; i < parts.size(); i++) {
std::cout << parts[i] << std::endl;
}
}
else {
remainder = "";
}
}
}

int main () {
/* Step1: create a TCP socket */
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

if (fd == -1) {
perror("socket failed \n");
exit(EXIT_FAILURE);
}
printf("Created a socket with fd: %d\n", fd);

/* Let us initialize the server address structure */
struct sockaddr_in saddr;

saddr.sin_family = AF_INET;
saddr.sin_port = htons(5001);
struct hostent *local_host = gethostbyname("127.0.0.1");
saddr.sin_addr = *((struct in_addr *)local_host->h_addr);

/* Step2: connect to the TCP server socket */
int ret_val = connect(fd, (struct sockaddr *)&saddr,
sizeof(struct sockaddr_in));

if (ret_val == -1) {
perror("connect failed");
close(fd);
exit(EXIT_FAILURE);
}
printf("The Socket is now connected\n");

std::thread t(recv_messages, fd);

char msg[MAX_LIMIT];

while (1) {
fgets(msg, MAX_LIMIT, stdin);
ret_val = send(fd, msg, strlen(msg), 0);
}

t.join();

/* Last step: close the socket */
close(fd);
return 0;
}

References

  1. Using epoll I/O event notification to implement an asynchronous server | xdecroc tech musings (wordpress.com)
  2. epoll(7) — Linux manual page (man7.org)
  3. select(2) — Linux manual page (man7.org)

--

--