Caching

Vasile Păpăluță
9 min readJun 19, 2023

--

Distributed Systems combine hundreds of different services. Each of the handles different requests, processing and generating high amounts of data. Some requests could be harder to process and require more resources and time to do that, while they are not so often needed to be recomputed at every page reload. Let’s take for example the user’s playlist of songs or videos. At every request the system would make a couple of queries and then return the result to the user. But every request would take the same amount of time. Adding the caching services only the first request would take longer, the next ones would be faster.

The problem statement.

Each request to a distributed system requires some computation and time resources. In the case of simple requests, this is pretty straightforward. Things get more complicated with requests that are repeatable and involve accessing a lot of services, at the same time and a fast response is required. For example, loading a profile page on social media. There are few chances that an update happened or it takes more time for this update to propagate through the system. However, the user wants a quick response. If another user requested this page, it is saved in the cache for some time. If the next request for this page happens within this time, then the user gets the cached page, instead of the page from the initial service, getting a much faster response. This process is illustrated in Figure 1.

Figure 1: The Use of Cache in a Distributed System.

CAP Theorem.

Before getting to how to implement a Cache, I want you to explain some theory behind Distributed Systems. I will try to be as fast as possible, and not too boring 😺. CAP Theorem was proposed in 2000 by Eric Brower, which describes a fundamental trade-off between 3 properties of the Distributed Systems:

  • Consistency — Simply saying is the property of each server to return the correct response to each request.
  • Availability — The property of a system that ensures a response for each incoming request.
  • Partition Tolerance — The ability of a system to continue working without stopping because of delays or missing messages.

The CAP theorem is saying that no distributed system can ensure all three of them. Bad news, yeah? Start panicking, I only started 😺 . The idea is that a distributed cannot exist without Partition Tolerance. Why? Because in such a case, the System will go down pretty fast. So we have only two choices — CP or AP application.

So, in reality, we should choose over having a highly Available system and basically Consistent or a highly Consistent system and a basically Available system. Different types of systems require different combinations of these two properties. In the case of Cache, we are going to implement a highly available and basically consistent system. In one of the next articles, I will show you how to do a CP system.

First things, first.

First, we need to take care of Partition Tolerance. In a distributed system anything can happen from dead nodes to missing packages. In the case of missing packages, we can simply resent the message again or to another service replica. So we need more replicas of the same service to store cached values. But there is a problem, we don’t just want to store them on all replicas the same information. That would be highly inefficient. Here we can make use of shared services.

In the case of sharded services instead of having one service, we split it into a number of sharded services, each of them handling a part of requests. In this case, if a shard falls the rest of the nodes are continuing to work, and even more, some of the remaining services can take the responsibilities of the fallen brother, while it is restarted. But to do that we need a technique named Consistency Hashing.

Usually, when we want to forward a request to one of the shards, we find the responsible one first. To do that we take a field of the request, or the whole request, and compute its hash. The hashing function will return an integer number, which is evenly distributed. This is illustrated in the image below:

Figure 2: Hashing function.

By taking the mod n of the hash we get the responsible service, where n is the number of shard services. Suppose we get the index of the service that isn’t working, then we cannot propagate the request. In this case, we can make something different and use consistency hashing. Consistency hashing is when instead of computing the hash mod n we do hash mod 360. Then we get a value between 0 and 359. In this setting, every service is related to a degree, and we just find the nearest degree to the value computed. The service related to this service represents the one responsible for this request. In such a way even if a service is down, we can always find a new responsible service. To make this possible we are going to use a structure named Hash Ring illustrated below. In such a way we can ensure a weak form of consistency. In another tutorial, we will look at how to ensure the full Consistency.

Ensuring Availability.

So how do ensure availability in this setting? Just by coping the structure above 2 or more times. But first, we need to add to our shared cache as a kind of Gateway. The Gateway will be responsible for computing the hash of the request, finding the responsible node, and propagating the request. Getting in such a way the following tree structure.

Figure 3: The tree structure of the Caching cluster.

Now we double these service clusters and get the structure below:

Figure 4: Two caching clusters.

When the client wants to save something in the cache it is sending 2 requests, one per cluster. In such a way we have 1 copy of the data per cluster. And when the client wants to request some value from the cache, it sends the request to one of the values, in case of getting an error (because of unavailability of the service, for example), then it can request the second one and get the required value. In such a way the cache ensures high availability. Now let’s see how we implement such services.

The Python implementation.

To implement the structure shown in Figure 3 we are going to use two types of services. As leaf nodes, we are going to use Memcached. It is a distributed, open-source, high-performance, distributed memory object caching system. It allows one to memorize something for a specified period of time. Memcached service can be defined in a docker-compose so we are going to define them here. Please make sure that you have installed Docker Desktop to be able to use Docker.

NOTE: Please make sure that you have Docker Desktop installed and it’s running.

To activate them just run the following command in the terminal:

docker compose build

Then run the next command to run the services:

docker compose up

These commands will run all services. You can let them run or not while developing the cache gateways. Just don’t forget to start them up with the second command when you want to test the whole app.

Next, Memcached services use TCP protocol for communication, but fortunately, we don’t need to implement the communication ourselves. For this purpose, we can use the pymemcache library. To install it use the following command:

pip install pymemcache

Now we can get to implementing the cache gateway service. First, we import the Flask modules and pymemcache’s client.

# Importing all needed modules.
from flask import Flask, request
from pymemcache.client import base

Next, we need to define the memcached clients:

# Defining the Memcached clients.
memcache_client1 = base.Client(("localhost", 11211))
memcache_client2 = base.Client(("localhost", 11212))
memcache_client3 = base.Client(("localhost", 11213))

The Client class takes as an argument the tuple with the host and the port of the service to connect to. We are going to use them as clients for interaction with the Memcached Service.

Now we implement the Hash Ring as a dictionary mapping degree to the Memcached service. Also, we need to implement the function that would find us the responsible service for a request:

# Defining the Hash Ring.
HASH_RING = {
0 : memcache_client1,
120 : memcache_client2,
240 : memcache_client3
}

def find_memcache_service(request_body):
'''
This function returns based on the request body the responsible service for this request.
:param request_body: dict
The content of the request.
:return: int
The index of the Memcached service responsible of the request.
'''
chosen_service_index = 0
min_difference = 360

# Extracting the hash of the user id and calculating the hash mode 360.
hash_payload = int(hash(request_body["user_id"]))
hash_mod_360 = hash_payload % 360

# Finding the degree with the lowest distance from hash mode 360.
for index in HASH_RING:
# Calculating the difference between the degree and hash mod 360.
difference = abs(hash_mod_360 - index)
if difference < min_difference:
# Updating the nearest degree.
min_difference = difference
chosen_service_index = index
return chosen_service_index

As shown in the listing above when a request comes the service extracts the hash of the user id from the request, followed by computing the hash mod 360. Then from the Hash ring, the service is find the Memcached service with the lowest difference between the mod 360 and the assigned degree. This service is the one to which the data is forwarded.

Going further we need to implement the endpoints of the Flask Service, starting with the /save one.

# Creating the Flask application.
app = Flask(__name__)

@app.route("/save", methods=["POST"])
def save():
'''
This endpoint processes the save requests.
'''
# Extracting the request body.
request_body = request.json

# Getting the index (degree) of the responsible Memcached service.
memcache_index = find_memcache_service(request_body)

# Sending the request to the Memcached service.
HASH_RING[memcache_index].set(request_body["user_id"],
request_body,
expire=30)
return {
"message" : "Saved!"
}, 200

During the processing of the request, we extract the request body and find the responsible Memcache service, then save data to this service for 30 seconds, returning the confirmation message.

During the processing of the /cache requests, we do the same things: extracting the data, finding the responsible Memcache, and getting the value. If the value is present in the responsible shard then we return the value and 200 status code, if not then we return an error message and the 404 status code:

@app.route("/cache", methods=["GET"])
def cache():
'''
This endpoint processes the caching requests.
'''
# Extracting the request body.
request_body = request.json

# Getting the index (degree) of the responsible Memcached service.
memcache_index = find_memcache_service(request_body)

# Getting the cached value from the responsible Memcached service.
cached_value = HASH_RING[memcache_index].get(request_body["user_id"])

# Returning the requested value or the error message.
if cached_value:
return cached_value, 200
else:
return {
"message" : "No such data"
}, 404

Finally run the app:

# Running the main service.
app.run()

NOTE: When running the application don’t forget to run the docker compose too.

During the development of the code for this article, I used the following schemas for requests:

{
"user_id" : "55852755-068a-48ce-b7b1-89e60320cc65",
"playlists" : [
"0d37b63d-61dc-4bf4-85bf-27e46558f9be",
"3d56c68d-54c9-47ed-b863-8d67748b5f1c",
"ca97bbde-e0fe-4c50-8225-0dbfcc12b431"
]
}
{
"user_id" : "55852755-068a-48ce-b7b1-89e60320cc65"
}

Now to implement the structure from Figure 4 just make a copy of the Flask Service in another folder and change the ports of the Memcached clients to the remaining from a docker-compose file, and set the service itself to another port like shown below:

app.run(port=6000)

Conclusion.

Caching is a very important concept and functionality for any distributed system. It enhances the speed with which the application can work and also saves a lot of computational resources if applied correctly. In the following article in the series, I will show how to easily take full advantage of having two or more services of services clusters.

The full commented code of the service can be found here: GitHub

Written by Păpăluță Vasile
LinkedIn: https://www.linkedin.com/in/vasile-p%C4%83p%C4%83lu%C8%9B%C4%83/
Instagram: https://www.instagram.com/science_kot/
GitHub: https://github.com/ScienceKot

--

--

Vasile Păpăluță

A young and passionate student about Data Science and Machine Learning, dreaming of becoming one day an AI Engineer.